You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/27 08:06:35 UTC

[pulsar] branch branch-2.7 updated (dedaf9f -> bc0587d)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from dedaf9f  Fix C++ client cannot be built on Windows(#10363)
     new 22ac4c8  Fix schema type check issue when use always compatible strategy (#10367)
     new 592c3f8  Fix primitive schema upload for ALWAYS_COMPATIBLE strategy. (#10386)
     new bc0587d  Add CI to verify C++ client could be built on Windows (#10387)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/ci-cpp-build-windows.yaml        |  79 +++++++
 .../service/schema/BookkeeperSchemaStorage.java    |   7 +-
 .../service/schema/SchemaRegistryServiceImpl.java  |  75 ++++---
 .../SchemaTypeCompatibilityCheckTest.java          | 244 ++++++---------------
 pulsar-client-cpp/.gitignore                       |   7 +
 pulsar-client-cpp/CMakeLists.txt                   |   8 +
 pulsar-client-cpp/README.md                        |  84 +++++--
 pulsar-client-cpp/lib/ConnectionPool.cc            |   1 -
 pulsar-client-cpp/lib/JavaStringHash.cc            |   1 +
 pulsar-client-cpp/lib/JavaStringHash.h             |   1 -
 pulsar-client-cpp/lib/TopicName.cc                 |   1 -
 pulsar-client-cpp/lib/auth/AuthOauth2.h            |   1 -
 pulsar-client-cpp/vcpkg.json                       |  26 +++
 13 files changed, 297 insertions(+), 238 deletions(-)
 create mode 100644 .github/workflows/ci-cpp-build-windows.yaml
 create mode 100644 pulsar-client-cpp/vcpkg.json

[pulsar] 01/03: Fix schema type check issue when use always compatible strategy (#10367)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 22ac4c842e1f64cd1d84d54ee2bb4c10fb080caf
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon Apr 26 08:27:44 2021 +0800

    Fix schema type check issue when use always compatible strategy (#10367)
    
    Related to #9797
    
    Fix schema type check issue when use always compatible strategy.
    
    1. For non-transitive strategy, only check schema type for the last schema
    2. For transitive strategy, check all schema's type
    3. Get schema by schema data should consider different schema types
    
    (cherry picked from commit 04f8c96a6c0c1c93cd495f46fb33d6e44d6004ea)
---
 .../service/schema/SchemaRegistryServiceImpl.java  |  75 ++++---
 .../SchemaTypeCompatibilityCheckTest.java          | 225 ++++-----------------
 2 files changed, 92 insertions(+), 208 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 37f7c56..0f493ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -141,15 +141,6 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
                                                               SchemaCompatibilityStrategy strategy) {
         return trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList ->
                 getSchemaVersionBySchemaData(schemaAndMetadataList, schema).thenCompose(schemaVersion -> {
-            if (strategy != SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE && schemaAndMetadataList.size() > 0) {
-                for (SchemaAndMetadata metadata : schemaAndMetadataList) {
-                    if (schema.getType() != metadata.schema.getType()) {
-                        return FutureUtil.failedFuture(new IncompatibleSchemaException(
-                                String.format("Incompatible schema: exists schema type %s, new schema type %s",
-                                metadata.schema.getType(), schema.getType())));
-                    }
-                }
-            }
             if (schemaVersion != null) {
                 return CompletableFuture.completedFuture(schemaVersion);
             }
@@ -298,6 +289,9 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
     public CompletableFuture<SchemaVersion> getSchemaVersionBySchemaData(
             List<SchemaAndMetadata> schemaAndMetadataList,
             SchemaData schemaData) {
+        if (schemaAndMetadataList == null || schemaAndMetadataList.size() == 0) {
+            return CompletableFuture.completedFuture(null);
+        }
         final CompletableFuture<SchemaVersion> completableFuture = new CompletableFuture<>();
         SchemaVersion schemaVersion;
         if (isUsingAvroSchemaParser(schemaData.getType())) {
@@ -308,14 +302,15 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
                 if (isUsingAvroSchemaParser(schemaData.getType())) {
                     Schema.Parser existParser = new Schema.Parser();
                     Schema existSchema = existParser.parse(new String(schemaAndMetadata.schema.getData(), UTF_8));
-                    if (newSchema.equals(existSchema)) {
+                    if (newSchema.equals(existSchema) && schemaAndMetadata.schema.getType() == schemaData.getType()) {
                         schemaVersion = schemaAndMetadata.version;
                         completableFuture.complete(schemaVersion);
                         return completableFuture;
                     }
                 } else {
                     if (Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(),
-                            hashFunction.hashBytes(schemaData.getData()).asBytes())) {
+                            hashFunction.hashBytes(schemaData.getData()).asBytes())
+                            && schemaAndMetadata.schema.getType() == schemaData.getType()) {
                         schemaVersion = schemaAndMetadata.version;
                         completableFuture.complete(schemaVersion);
                         return completableFuture;
@@ -325,7 +320,8 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
         } else {
             for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
                 if (Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(),
-                        hashFunction.hashBytes(schemaData.getData()).asBytes())) {
+                        hashFunction.hashBytes(schemaData.getData()).asBytes())
+                        && schemaAndMetadata.schema.getType() == schemaData.getType()) {
                     schemaVersion = schemaAndMetadata.version;
                     completableFuture.complete(schemaVersion);
                     return completableFuture;
@@ -338,14 +334,23 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
 
     private CompletableFuture<Void> checkCompatibilityWithLatest(String schemaId, SchemaData schema,
                                                                     SchemaCompatibilityStrategy strategy) {
+        if (SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE == strategy) {
+            return CompletableFuture.completedFuture(null);
+        }
         return getSchema(schemaId).thenCompose(existingSchema -> {
             if (existingSchema != null && !existingSchema.schema.isDeleted()) {
                 CompletableFuture<Void> result = new CompletableFuture<>();
-                try {
-                    checkCompatible(existingSchema, schema, strategy);
-                    result.complete(null);
-                } catch (IncompatibleSchemaException e) {
-                    result.completeExceptionally(e);
+                if (existingSchema.schema.getType() != schema.getType()) {
+                    result.completeExceptionally(new IncompatibleSchemaException(
+                            String.format("Incompatible schema: exists schema type %s, new schema type %s",
+                                    existingSchema.schema.getType(), schema.getType())));
+                } else {
+                    try {
+                        checkCompatible(existingSchema, schema, strategy);
+                        result.complete(null);
+                    } catch (IncompatibleSchemaException e) {
+                        result.completeExceptionally(e);
+                    }
                 }
                 return result;
             } else {
@@ -365,17 +370,35 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
                                                               SchemaCompatibilityStrategy strategy,
                                                               List<SchemaAndMetadata> schemaAndMetadataList) {
         CompletableFuture<Void> result = new CompletableFuture<>();
-        try {
-            compatibilityChecks.getOrDefault(schema.getType(), SchemaCompatibilityCheck.DEFAULT).checkCompatible(schemaAndMetadataList
-                    .stream()
-                    .map(schemaAndMetadata -> schemaAndMetadata.schema)
-                    .collect(Collectors.toList()), schema, strategy);
+        if (strategy == SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE) {
             result.complete(null);
-        } catch (Exception e) {
-            if (e instanceof IncompatibleSchemaException) {
-                result.completeExceptionally(e);
+        } else {
+            SchemaAndMetadata breakSchema = null;
+            for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
+                if (schemaAndMetadata.schema.getType() != schema.getType()) {
+                    breakSchema = schemaAndMetadata;
+                    break;
+                }
+            }
+            if (breakSchema == null) {
+                try {
+                    compatibilityChecks.getOrDefault(schema.getType(), SchemaCompatibilityCheck.DEFAULT)
+                            .checkCompatible(schemaAndMetadataList
+                                    .stream()
+                                    .map(schemaAndMetadata -> schemaAndMetadata.schema)
+                                    .collect(Collectors.toList()), schema, strategy);
+                    result.complete(null);
+                } catch (Exception e) {
+                    if (e instanceof IncompatibleSchemaException) {
+                        result.completeExceptionally(e);
+                    } else {
+                        result.completeExceptionally(new IncompatibleSchemaException(e));
+                    }
+                }
             } else {
-                result.completeExceptionally(new IncompatibleSchemaException(e));
+                result.completeExceptionally(new IncompatibleSchemaException(
+                        String.format("Incompatible schema: exists schema type %s, new schema type %s",
+                                breakSchema.schema.getType(), schema.getType())));
             }
         }
         return result;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
index c24822f..bc711c5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
@@ -30,14 +30,18 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.schema.Schemas;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.expectThrows;
 
@@ -173,101 +177,6 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes
     }
 
     @Test
-    public void structTypeProducerProducerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "structTypeProducerProducerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
-                .topic(topicName)
-                .create();
-
-        pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
-                .topic(topicName)
-                .create();
-    }
-
-    @Test
-    public void structTypeProducerConsumerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "structTypeProducerConsumerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
-                .topic(topicName)
-                .create();
-
-        pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName)
-                .subscribe();
-    }
-
-    @Test
-    public void structTypeConsumerProducerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "structTypeConsumerProducerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class))
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName)
-                .subscribe();
-
-        pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
-                .topic(topicName)
-                .create();
-    }
-
-    @Test
-    public void structTypeConsumerConsumerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "structTypeConsumerConsumerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class))
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName + "1")
-                .subscribe();
-
-        pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName + "2")
-                .subscribe();
-    }
-
-    @Test
     public void primitiveTypeProducerProducerUndefinedCompatible() throws Exception {
         admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.UNDEFINED);
 
@@ -371,98 +280,50 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes
     }
 
     @Test
-    public void primitiveTypeProducerProducerAlwaysCompatible() throws Exception {
+    public void testAlwaysCompatible() throws Exception {
         admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        String topicName = TopicName.get(
+        final String topicName = TopicName.get(
                 TopicDomain.persistent.value(),
                 PUBLIC_TENANT,
                 namespace,
-                "primitiveTypeProducerProducerAlwaysCompatible"
+                "testAlwaysCompatible" + UUID.randomUUID().toString()
         ).toString();
-
-        pulsarClient.newProducer(Schema.INT32)
-                .topic(topicName)
-                .create();
-
-        pulsarClient.newProducer(Schema.STRING)
-                .topic(topicName)
-                .create();
-    }
-
-    @Test
-    public void primitiveTypeProducerConsumerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "primitiveTypeProducerConsumerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newProducer(Schema.INT32)
-                .topic(topicName)
-                .create();
-
-        pulsarClient.newConsumer(Schema.STRING)
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName + "2")
-                .subscribe();
-    }
-
-    @Test
-    public void primitiveTypeConsumerProducerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "primitiveTypeConsumerProducerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newConsumer(Schema.INT32)
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName)
-                .subscribe();
-
-        pulsarClient.newProducer(Schema.STRING)
-                .topic(topicName)
-                .create();
-    }
-
-    @Test
-    public void primitiveTypeConsumerConsumerAlwaysCompatible() throws Exception {
-        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-
-        final String subName = "my-sub";
-        String topicName = TopicName.get(
-                TopicDomain.persistent.value(),
-                PUBLIC_TENANT,
-                namespace,
-                "primitiveTypeConsumerConsumerAlwaysCompatible"
-        ).toString();
-
-        pulsarClient.newConsumer(Schema.INT32)
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName + "1")
-                .subscribe();
-
-        pulsarClient.newConsumer(Schema.STRING)
-                .topic(topicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscriptionName(subName + "2")
-                .subscribe();
+        Schema<?>[] schemas = new Schema[] {
+                Schema.AVRO(Schemas.PersonOne.class),
+                Schema.AVRO(Schemas.PersonFour.class),
+                Schema.JSON(Schemas.PersonOne.class),
+                Schema.JSON(Schemas.PersonFour.class),
+                Schema.INT8,
+                Schema.INT16,
+                Schema.INT32,
+                Schema.INT64,
+                Schema.DATE,
+                Schema.BOOL,
+                Schema.DOUBLE,
+                Schema.STRING,
+                Schema.BYTES,
+                Schema.FLOAT,
+                Schema.INSTANT,
+                Schema.BYTEBUFFER,
+                Schema.TIME,
+                Schema.TIMESTAMP,
+                Schema.LOCAL_DATE,
+                Schema.LOCAL_DATE_TIME,
+                Schema.LOCAL_TIME
+        };
+
+        for (Schema<?> schema : schemas) {
+            pulsarClient.newProducer(schema)
+                    .topic(topicName)
+                    .create();
+        }
+
+        for (Schema<?> schema : schemas) {
+            pulsarClient.newConsumer(schema)
+                    .topic(topicName)
+                    .subscriptionName(UUID.randomUUID().toString())
+                    .subscribe();
+        }
     }
 
 }

[pulsar] 02/03: Fix primitive schema upload for ALWAYS_COMPATIBLE strategy. (#10386)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 592c3f8040a10826ec5488a6836209cddaddd263
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Apr 27 06:56:36 2021 +0800

    Fix primitive schema upload for ALWAYS_COMPATIBLE strategy. (#10386)
    
    * Fix primary schema upload for ALWAYS_COMPATIBLE strategy.
    
    * Fix checkstyle.
    
    (cherry picked from commit f8716c258f095f8cb0ec44ea7e6b56cb431224f3)
---
 .../service/schema/BookkeeperSchemaStorage.java    |  7 +------
 .../SchemaTypeCompatibilityCheckTest.java          | 23 ++++++++++++++++++++--
 2 files changed, 22 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 0911ac6..2ecc927 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -31,7 +31,6 @@ import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -294,12 +293,8 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
         return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(optLocatorEntry -> {
 
             if (optLocatorEntry.isPresent()) {
-                // Schema locator was already present
+
                 SchemaStorageFormat.SchemaLocator locator = optLocatorEntry.get().locator;
-                byte[] storedHash = locator.getInfo().getHash().toByteArray();
-                if (storedHash.length > 0 && Arrays.equals(storedHash, hash)) {
-                    return completedFuture(locator.getInfo().getVersion());
-                }
 
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] findSchemaEntryByHash - hash={}", schemaId, hash);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
index bc711c5..61cc989 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
@@ -20,7 +20,9 @@ package org.apache.pulsar.schema.compatibility;
 
 import com.google.common.collect.Sets;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
@@ -313,17 +315,34 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes
         };
 
         for (Schema<?> schema : schemas) {
-            pulsarClient.newProducer(schema)
+            Producer<?> p = pulsarClient.newProducer(schema)
                     .topic(topicName)
                     .create();
+            p.close();
         }
 
         for (Schema<?> schema : schemas) {
-            pulsarClient.newConsumer(schema)
+            Consumer<?> c = pulsarClient.newConsumer(schema)
                     .topic(topicName)
                     .subscriptionName(UUID.randomUUID().toString())
                     .subscribe();
+            c.close();
         }
+
+        List<SchemaInfo> schemasOfTopic = admin.schemas().getAllSchemas(topicName);
+
+        // bytes[] schema and bytebuffer schema does not upload schema info to the schema registry
+        assertEquals(schemasOfTopic.size(), schemas.length - 2);
+
+        // Try to upload the schema again.
+        for (Schema<?> schema : schemas) {
+            Producer<?> p = pulsarClient.newProducer(schema)
+                    .topic(topicName)
+                    .create();
+            p.close();
+        }
+
+        assertEquals(schemasOfTopic.size(), schemas.length - 2);
     }
 
 }

[pulsar] 03/03: Add CI to verify C++ client could be built on Windows (#10387)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bc0587d1daeb0b6aa011aa7466505397812366c5
Author: Yunze Xu <xy...@gmail.com>
AuthorDate: Tue Apr 27 02:36:35 2021 +0800

    Add CI to verify C++ client could be built on Windows (#10387)
    
    * Add vcpkg dependencies
    
    * Add dependencies for compression
    
    * Add C++ client Windows build CI
    
    * Remove redundant variables
    
    * Update README
    
    * Fix compile error
    
    (cherry picked from commit d81c16abed5d2f390dafe617fade57e8de644a95)
---
 .github/workflows/ci-cpp-build-windows.yaml | 79 +++++++++++++++++++++++++++
 pulsar-client-cpp/.gitignore                |  7 +++
 pulsar-client-cpp/CMakeLists.txt            |  8 +++
 pulsar-client-cpp/README.md                 | 84 ++++++++++++++++++++++-------
 pulsar-client-cpp/lib/ConnectionPool.cc     |  1 -
 pulsar-client-cpp/lib/JavaStringHash.cc     |  1 +
 pulsar-client-cpp/lib/JavaStringHash.h      |  1 -
 pulsar-client-cpp/lib/TopicName.cc          |  1 -
 pulsar-client-cpp/lib/auth/AuthOauth2.h     |  1 -
 pulsar-client-cpp/vcpkg.json                | 26 +++++++++
 10 files changed, 185 insertions(+), 24 deletions(-)

diff --git a/.github/workflows/ci-cpp-build-windows.yaml b/.github/workflows/ci-cpp-build-windows.yaml
new file mode 100644
index 0000000..87de993
--- /dev/null
+++ b/.github/workflows/ci-cpp-build-windows.yaml
@@ -0,0 +1,79 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: CI - CPP build on Windows
+on:
+  pull_request:
+    branches:
+      - master
+  push:
+    branches:
+      - branch-*
+
+env:
+  VCPKG_FEATURE_FLAGS: manifests
+
+jobs:
+  cpp-build-windows:
+    timeout-minutes: 120
+
+    name: ${{ matrix.name }}
+    runs-on: ${{ matrix.os }}
+    strategy:
+      fail-fast: false
+      matrix:
+        include:
+          - name: 'Windows x64'
+            os: windows-latest
+            triplet: x64-windows
+            vcpkg_dir: 'C:\vcpkg'
+            suffix: 'windows-win64'
+            generator: 'Visual Studio 16 2019'
+            arch: '-A x64'
+
+    steps:
+      - name: checkout
+        uses: actions/checkout@v2
+
+      - name: Install vcpkg packages
+        shell: bash
+        run: |
+          cd pulsar-client-cpp && vcpkg install --triplet ${{ matrix.triplet }}
+
+      - name: Configure
+        shell: bash
+        run: |
+          if [ "$RUNNER_OS" == "Windows" ]; then
+            cd pulsar-client-cpp && \
+            cmake \
+              -B ./build \
+              -G "${{ matrix.generator }}" ${{ matrix.arch }} \
+              -DBUILD_PYTHON_WRAPPER=OFF -DBUILD_TESTS=OFF \
+              -DVCPKG_TRIPLET=${{ matrix.triplet }} \
+              -DCMAKE_BUILD_TYPE=Release \
+              -S .
+          fi
+
+      - name: Compile
+        shell: bash
+        run: |
+          if [ "$RUNNER_OS" == "Windows" ]; then
+            cd pulsar-client-cpp && \
+            cmake --build ./build
+          fi
diff --git a/pulsar-client-cpp/.gitignore b/pulsar-client-cpp/.gitignore
index f031a55..afb332d 100644
--- a/pulsar-client-cpp/.gitignore
+++ b/pulsar-client-cpp/.gitignore
@@ -73,3 +73,10 @@ pulsar-dist
 install_manifest.txt
 merged-library
 python/venv
+
+# Visual Studio files
+out/
+CMakeSettings.json
+
+# vcpkg dependencies directory
+vcpkg_installed/
diff --git a/pulsar-client-cpp/CMakeLists.txt b/pulsar-client-cpp/CMakeLists.txt
index d4fdb8e..59b1c00 100644
--- a/pulsar-client-cpp/CMakeLists.txt
+++ b/pulsar-client-cpp/CMakeLists.txt
@@ -21,6 +21,14 @@ cmake_minimum_required(VERSION 3.4)
 project (pulsar-cpp)
 set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake_modules")
 
+if (VCPKG_TRIPLET)
+    message(STATUS "Use vcpkg, triplet is ${VCPKG_TRIPLET}")
+    set(CMAKE_PREFIX_PATH "${CMAKE_SOURCE_DIR}/vcpkg_installed/${VCPKG_TRIPLET}")
+    message(STATUS "Use CMAKE_PREFIX_PATH: ${CMAKE_PREFIX_PATH}")
+    set(PROTOC_PATH "${CMAKE_PREFIX_PATH}/tools/protobuf/protoc")
+    message(STATUS "Use protoc: ${PROTOC_PATH}")
+endif()
+
 find_program(CCACHE_PROGRAM ccache)
 if(CCACHE_PROGRAM)
     set(CMAKE_CXX_COMPILER_LAUNCHER "ccache")
diff --git a/pulsar-client-cpp/README.md b/pulsar-client-cpp/README.md
index 67ec2bf..f489c3d 100644
--- a/pulsar-client-cpp/README.md
+++ b/pulsar-client-cpp/README.md
@@ -37,14 +37,31 @@ https://github.com/apache/pulsar/tree/master/pulsar-client-cpp/examples
 
 ## Requirements
 
- * CMake
+ * A C++ compiler that supports C++11, like GCC >= 4.8
+ * CMake >= 3.4
  * [Boost](http://www.boost.org/)
- * [Protocol Buffer 2.6](https://developers.google.com/protocol-buffers/)
- * [Log4CXX](https://logging.apache.org/log4cxx)
- * LibCurl
- * [GTest](https://github.com/google/googletest)
- * JsonCpp
+ * [Protocol Buffer](https://developers.google.com/protocol-buffers/)
+ * [libcurl](https://curl.se/libcurl/)
+ * [openssl](https://github.com/openssl/openssl)
 
+It's recommended to use Protocol Buffer 2.6 because it's verified by CI, but 3.x also works.
+
+The default supported [compression types](include/pulsar/CompressionType.h) are:
+
+- `CompressionNone`
+- `CompressionLZ4`
+
+If you want to enable other compression types, you need to install:
+
+- `CompressionZLib`: [zlib](https://zlib.net/)
+- `CompressionZSTD`: [zstd](https://github.com/facebook/zstd)
+- `CompressionSNAPPY`: [snappy](https://github.com/google/snappy)
+
+If you want to build and run the tests, you need to install [GTest](https://github.com/google/googletest). Otherwise, you need to add CMake option `-DBUILD_TESTS=OFF`.
+
+If you don't want to build Python client since `boost-python` may not be easy to install, you need to add CMake option `-DBUILD_PYTHON_WRAPPER=OFF`.
+
+If you want to use `ClientConfiguration::setLogConfFilePath`, you need to install the [Log4CXX](https://logging.apache.org/log4cxx) and add CMake option `-DUSE_LOG4CXX=ON`.
 
 ## Platforms
 
@@ -52,6 +69,7 @@ Pulsar C++ Client Library has been tested on:
 
 * Linux
 * Mac OS X
+* Windows x64
 
 ## Compilation
 
@@ -76,7 +94,7 @@ Run unit tests:
 ```shell
 apt-get install -y g++ cmake libssl-dev libcurl4-openssl-dev liblog4cxx-dev \
                 libprotobuf-dev libboost-all-dev  libgtest-dev google-mock \
-                libjsoncpp-dev libxml2-utils protobuf-compiler python-setuptools
+                protobuf-compiler python-setuptools
 ```
 
 #### Compile and install Google Test:
@@ -167,22 +185,45 @@ ${PULSAR_PATH}/pulsar-client-cpp/perf/perfConsumer
 
 ### Compile on Windows
 
-#### Install all dependencies:
+#### Install with [vcpkg](https://github.com/microsoft/vcpkg)
 
-Clone and build all dependencies from source if a binary distro can't be found.
+It's highly recommended to use `vcpkg` for C++ package management on Windows. It's easy to install and well supported by Visual Studio (2015/2017/2019) and CMake. See [here](https://github.com/microsoft/vcpkg#quick-start-windows) for quick start.
 
-- [Boost](https://github.com/boostorg/boost)
-- [LibCurl](https://github.com/curl/curl)
-- [zlib](https://github.com/madler/zlib)
-- [OpenSSL](https://github.com/openssl/openssl)
-- [ProtoBuf](https://github.com/protocolbuffers/protobuf)
-- [dlfcn-win32](https://github.com/dlfcn-win32/dlfcn-win32)
-- [LLVM](https://llvm.org/builds/) (for clang-tidy and clang-format)
+Take 64 bits Windows as an example, you only need to run
 
-If you want to build and run the tests, then also install
-- [GTest and GMock](https://github.com/google/googletest)
+```bash
+vcpkg install --feature-flags=manifests --triplet x64-windows
+```
 
-#### Compile Pulsar client library:
+> NOTE: The default triplet is `x86-windows`, see [here](https://github.com/microsoft/vcpkg/blob/master/docs/users/triplets.md) for more details.
+
+The all dependencies, which are specified by [vcpkg.json](vcpkg.json), will be installed in `vcpkg_installed/` subdirectory,
+
+With `vcpkg`, you only need to run two commands:
+
+```bash
+cmake \
+ -B ./build \
+ -A x64 \
+ -DBUILD_PYTHON_WRAPPER=OFF -DBUILD_TESTS=OFF \
+ -DVCPKG_TRIPLET=x64-windows \
+ -DCMAKE_BUILD_TYPE=Release \
+ -S .
+cmake --build ./build --config Release
+```
+
+Then all artifacts will be built into `build` subdirectory.
+
+> NOTE:
+>
+> 1. Change `Release` to `Debug` if you want to build a debug version library.
+> 2. For 32 bits Windows, you need to use `-A Win32` and `-DVCPKG_TRIPLET=x32-windows`.
+
+#### Install dependencies manually
+
+You need to install [dlfcn-win32](https://github.com/dlfcn-win32/dlfcn-win32) in addition.
+
+If you installed the dependencies manually, you need to run
 
 ```shell
 #If all dependencies are in your path, all that is necessary is
@@ -212,4 +253,7 @@ ${PULSAR_PATH}/pulsar-test-service-stop.sh
 ```
 
 ## Requirements for Contributors
-We welcome contributions from the open source community, kindly make sure your changes are backward compatible with gcc-4.4.7 and Boost 1.41.
+
+It's recommended to install [LLVM](https://llvm.org/builds/) for `clang-tidy` and `clang-format`. Pulsar C++ client use `clang-format` 5.0 to format files, which is a little different with latest `clang-format`.
+
+We welcome contributions from the open source community, kindly make sure your changes are backward compatible with GCC 4.8 and Boost 1.53.
diff --git a/pulsar-client-cpp/lib/ConnectionPool.cc b/pulsar-client-cpp/lib/ConnectionPool.cc
index 586acec..cc5668b 100644
--- a/pulsar-client-cpp/lib/ConnectionPool.cc
+++ b/pulsar-client-cpp/lib/ConnectionPool.cc
@@ -21,7 +21,6 @@
 #include "LogUtils.h"
 #include "Url.h"
 
-#include <boost/iostreams/stream.hpp>
 #include <boost/asio.hpp>
 #include <boost/asio/ssl.hpp>
 
diff --git a/pulsar-client-cpp/lib/JavaStringHash.cc b/pulsar-client-cpp/lib/JavaStringHash.cc
index 7579e0e..bf809bf 100644
--- a/pulsar-client-cpp/lib/JavaStringHash.cc
+++ b/pulsar-client-cpp/lib/JavaStringHash.cc
@@ -17,6 +17,7 @@
  * under the License.
  */
 #include "JavaStringHash.h"
+#include <limits>
 
 namespace pulsar {
 
diff --git a/pulsar-client-cpp/lib/JavaStringHash.h b/pulsar-client-cpp/lib/JavaStringHash.h
index 8498d1a..6059b1a 100644
--- a/pulsar-client-cpp/lib/JavaStringHash.h
+++ b/pulsar-client-cpp/lib/JavaStringHash.h
@@ -24,7 +24,6 @@
 
 #include <cstdint>
 #include <string>
-#include <boost/functional/hash.hpp>
 
 namespace pulsar {
 class PULSAR_PUBLIC JavaStringHash : public Hash {
diff --git a/pulsar-client-cpp/lib/TopicName.cc b/pulsar-client-cpp/lib/TopicName.cc
index 46fa355..a56b979 100644
--- a/pulsar-client-cpp/lib/TopicName.cc
+++ b/pulsar-client-cpp/lib/TopicName.cc
@@ -21,7 +21,6 @@
 #include "PartitionedProducerImpl.h"
 #include "TopicName.h"
 
-#include <boost/format.hpp>
 #include <boost/algorithm/string.hpp>
 #include <boost/algorithm/string/find.hpp>
 #include <memory>
diff --git a/pulsar-client-cpp/lib/auth/AuthOauth2.h b/pulsar-client-cpp/lib/auth/AuthOauth2.h
index f9bdc21..4de6f53 100644
--- a/pulsar-client-cpp/lib/auth/AuthOauth2.h
+++ b/pulsar-client-cpp/lib/auth/AuthOauth2.h
@@ -21,7 +21,6 @@
 
 #include <pulsar/Authentication.h>
 #include <string>
-#include <boost/function.hpp>
 
 namespace pulsar {
 
diff --git a/pulsar-client-cpp/vcpkg.json b/pulsar-client-cpp/vcpkg.json
new file mode 100644
index 0000000..3c14d10
--- /dev/null
+++ b/pulsar-client-cpp/vcpkg.json
@@ -0,0 +1,26 @@
+{
+  "name": "pulsar-cpp",
+  "version": "2.8.0",
+  "description": "Pulsar C++ SDK",
+  "dependencies": [
+    "boost-accumulators",
+    "boost-algorithm",
+    "boost-any",
+    "boost-circular-buffer",
+    "boost-asio",
+    "boost-date-time",
+    "boost-predef",
+    "boost-program-options",
+    "boost-property-tree",
+    "boost-random",
+    "boost-serialization",
+    "boost-xpressive",
+    "curl",
+    "dlfcn-win32",
+    "openssl",
+    "protobuf",
+    "snappy",
+    "zlib",
+    "zstd"
+  ]
+}