You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/06 16:46:58 UTC

[pulsar] branch master updated: Multi version generic schema provider (#3756)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a662757  Multi version generic schema provider (#3756)
a662757 is described below

commit a66275741237e59b76efe7d5339f25a3a89f2fb1
Author: penghui <co...@gmail.com>
AuthorDate: Thu Mar 7 00:46:53 2019 +0800

    Multi version generic schema provider (#3756)
    
    * Add multi version generic schema provider.
    
    * fix log output and refactor get schema in lookup service.
---
 .../client/impl/BinaryProtoLookupService.java      | 11 ++-
 .../pulsar/client/impl/HttpLookupService.java      | 18 ++++-
 .../apache/pulsar/client/impl/LookupService.java   | 15 ++++
 .../generic/MultiVersionGenericSchemaProvider.java | 87 ++++++++++++++++++++++
 .../MultiVersionGenericSchemaProviderTest.java     | 67 +++++++++++++++++
 .../pulsar/common/schema/BytesSchemaVersion.java   | 40 ++++++++++
 6 files changed, 234 insertions(+), 4 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 94773e3..8d07552 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.L
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -183,10 +184,16 @@ public class BinaryProtoLookupService implements LookupService {
 
     @Override
     public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName) {
+        return getSchema(topicName, null);
+    }
+
+
+    @Override
+    public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, byte[] version) {
         return client.getCnxPool().getConnection(serviceNameResolver.resolveHost()).thenCompose(clientCnx -> {
             long requestId = client.newRequestId();
-            ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(), Optional.empty());
-
+            ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(),
+                    Optional.ofNullable(BytesSchemaVersion.of(version)));
             return clientCnx.sendGetSchema(request, requestId);
         });
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 955c45e..8e26c87 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -24,7 +24,9 @@ import io.netty.channel.EventLoopGroup;
 
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -136,18 +138,30 @@ class HttpLookupService implements LookupService {
 
     @Override
     public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName) {
+        return getSchema(topicName, null);
+    }
+
+    @Override
+    public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, byte[] version) {
         CompletableFuture<Optional<SchemaInfo>> future = new CompletableFuture<>();
 
         String schemaName = topicName.getSchemaName();
         String path = String.format("admin/v2/schemas/%s/schema", schemaName);
-
+        if (version != null) {
+            path = String.format("admin/v2/schemas/%s/schema/%s",
+                    schemaName,
+                    new String(version, StandardCharsets.UTF_8));
+        }
         httpClient.get(path, GetSchemaResponse.class).thenAccept(response -> {
             future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, response)));
         }).exceptionally(ex -> {
             if (ex.getCause() instanceof NotFoundException) {
                 future.complete(Optional.empty());
             } else {
-                log.warn("Failed to get schema for topic {} : {}", topicName, ex.getCause().getClass());
+                log.warn("Failed to get schema for topic {} version {}",
+                        topicName,
+                        version != null ? Base64.getEncoder().encodeToString(version) : null,
+                        ex.getCause());
                 future.completeExceptionally(ex);
             }
             return null;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
index 5d526a5..dcb6a0b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
@@ -67,9 +67,24 @@ public interface LookupService extends AutoCloseable {
 	 */
 	public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName);
 
+	/**
+	 * Returns current SchemaInfo {@link SchemaInfo} for a given topic.
+	 *
+	 * @param topicName topic-name
+	 * @return SchemaInfo
+	 */
 	public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName);
 
 	/**
+	 * Returns specific version SchemaInfo {@link SchemaInfo} for a given topic.
+	 *
+	 * @param topicName topic-name
+	 * @param version schema info version
+	 * @return SchemaInfo
+	 */
+	public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, byte[] version);
+
+	/**
 	 * Returns broker-service lookup api url.
 	 *
 	 * @return
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProvider.java
new file mode 100644
index 0000000..fbf6c19
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProvider.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Multi version generic schema provider by guava cache.
+ */
+public class MultiVersionGenericSchemaProvider implements SchemaProvider<GenericRecord> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MultiVersionGenericSchemaProvider.class);
+
+    private final TopicName topicName;
+    private final PulsarClientImpl pulsarClient;
+
+    private final LoadingCache<byte[], GenericSchema> cache = CacheBuilder.newBuilder().maximumSize(100000)
+            .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<byte[], GenericSchema>() {
+                @Override
+                public GenericSchema load(byte[] schemaVersion) throws Exception {
+                    return loadSchema(schemaVersion);
+                }
+            });
+
+    public MultiVersionGenericSchemaProvider(TopicName topicName, PulsarClientImpl pulsarClient) {
+        this.topicName = topicName;
+        this.pulsarClient = pulsarClient;
+    }
+
+    @Override
+    public GenericSchema getSchema(byte[] schemaVersion) {
+        try {
+            if (null == schemaVersion) {
+                return null;
+            }
+            return cache.get(schemaVersion);
+        } catch (ExecutionException e) {
+            LOG.error("Can't get generic schema for topic {} schema version {}",
+                    topicName.toString(), new String(schemaVersion, StandardCharsets.UTF_8), e);
+            return null;
+        }
+    }
+
+    private GenericSchema loadSchema(byte[] schemaVersion) throws ExecutionException, InterruptedException {
+        Optional<SchemaInfo> schemaInfo = pulsarClient.getLookup()
+                .getSchema(topicName, schemaVersion).get();
+        return schemaInfo.map(GenericSchemaImpl::of).orElse(null);
+    }
+
+    public TopicName getTopic() {
+        return topicName;
+    }
+
+    public PulsarClientImpl getPulsarClient() {
+        return pulsarClient;
+    }
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProviderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProviderTest.java
new file mode 100644
index 0000000..884a674
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProviderTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.SchemaTestUtils;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Unit test for {@link MultiVersionGenericSchemaProvider}.
+ */
+public class MultiVersionGenericSchemaProviderTest {
+
+    private MultiVersionGenericSchemaProvider schemaProvider;
+
+    @BeforeMethod
+    public void setup() {
+        PulsarClientImpl client = mock(PulsarClientImpl.class);
+        when(client.getLookup()).thenReturn(mock(LookupService.class));
+        schemaProvider = new MultiVersionGenericSchemaProvider(
+                TopicName.get("persistent://public/default/my-topic"), client);
+    }
+
+    @Test
+    public void testGetSchema() {
+        CompletableFuture<Optional<SchemaInfo>> completableFuture = new CompletableFuture<>();
+        SchemaInfo schemaInfo = AvroSchema.of(SchemaTestUtils.Foo.class).getSchemaInfo();
+        completableFuture.complete(Optional.of(schemaInfo));
+        when(schemaProvider.getPulsarClient().getLookup()
+                .getSchema(
+                        any(TopicName.class),
+                        any(byte[].class)))
+                .thenReturn(completableFuture);
+        GenericSchema schema = schemaProvider.getSchema(new byte[0]);
+        assertEquals(schema.getSchemaInfo(), schemaInfo);
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/BytesSchemaVersion.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/BytesSchemaVersion.java
new file mode 100644
index 0000000..13e875f
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/BytesSchemaVersion.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+/**
+ * Bytes schema version
+ */
+public class BytesSchemaVersion implements SchemaVersion {
+
+    private final byte[] bytes;
+
+    private BytesSchemaVersion(byte[] bytes) {
+        this.bytes = bytes;
+    }
+
+    @Override
+    public byte[] bytes() {
+        return bytes;
+    }
+
+    public static BytesSchemaVersion of(byte[] bytes) {
+        return bytes != null ? new BytesSchemaVersion(bytes) : null;
+    }
+}