You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/06 16:46:58 UTC
[pulsar] branch master updated: Multi version generic schema
provider (#3756)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a662757 Multi version generic schema provider (#3756)
a662757 is described below
commit a66275741237e59b76efe7d5339f25a3a89f2fb1
Author: penghui <co...@gmail.com>
AuthorDate: Thu Mar 7 00:46:53 2019 +0800
Multi version generic schema provider (#3756)
* Add multi version generic schema provider.
* fix log output and refactor get schema in lookup service.
---
.../client/impl/BinaryProtoLookupService.java | 11 ++-
.../pulsar/client/impl/HttpLookupService.java | 18 ++++-
.../apache/pulsar/client/impl/LookupService.java | 15 ++++
.../generic/MultiVersionGenericSchemaProvider.java | 87 ++++++++++++++++++++++
.../MultiVersionGenericSchemaProviderTest.java | 67 +++++++++++++++++
.../pulsar/common/schema/BytesSchemaVersion.java | 40 ++++++++++
6 files changed, 234 insertions(+), 4 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 94773e3..8d07552 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.L
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -183,10 +184,16 @@ public class BinaryProtoLookupService implements LookupService {
@Override
public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName) {
+ return getSchema(topicName, null);
+ }
+
+
+ @Override
+ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, byte[] version) {
return client.getCnxPool().getConnection(serviceNameResolver.resolveHost()).thenCompose(clientCnx -> {
long requestId = client.newRequestId();
- ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(), Optional.empty());
-
+ ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(),
+ Optional.ofNullable(BytesSchemaVersion.of(version)));
return clientCnx.sendGetSchema(request, requestId);
});
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 955c45e..8e26c87 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -24,7 +24,9 @@ import io.netty.channel.EventLoopGroup;
import java.net.InetSocketAddress;
import java.net.URI;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import java.util.Base64;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -136,18 +138,30 @@ class HttpLookupService implements LookupService {
@Override
public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName) {
+ return getSchema(topicName, null);
+ }
+
+ @Override
+ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, byte[] version) {
CompletableFuture<Optional<SchemaInfo>> future = new CompletableFuture<>();
String schemaName = topicName.getSchemaName();
String path = String.format("admin/v2/schemas/%s/schema", schemaName);
-
+ if (version != null) {
+ path = String.format("admin/v2/schemas/%s/schema/%s",
+ schemaName,
+ new String(version, StandardCharsets.UTF_8));
+ }
httpClient.get(path, GetSchemaResponse.class).thenAccept(response -> {
future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, response)));
}).exceptionally(ex -> {
if (ex.getCause() instanceof NotFoundException) {
future.complete(Optional.empty());
} else {
- log.warn("Failed to get schema for topic {} : {}", topicName, ex.getCause().getClass());
+ log.warn("Failed to get schema for topic {} version {}",
+ topicName,
+ version != null ? Base64.getEncoder().encodeToString(version) : null,
+ ex.getCause());
future.completeExceptionally(ex);
}
return null;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
index 5d526a5..dcb6a0b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
@@ -67,9 +67,24 @@ public interface LookupService extends AutoCloseable {
*/
public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName);
+ /**
+ * Returns current SchemaInfo {@link SchemaInfo} for a given topic.
+ *
+ * @param topicName topic-name
+ * @return SchemaInfo
+ */
public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName);
/**
+ * Returns specific version SchemaInfo {@link SchemaInfo} for a given topic.
+ *
+ * @param topicName topic-name
+ * @param version schema info version
+ * @return SchemaInfo
+ */
+ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, byte[] version);
+
+ /**
* Returns broker-service lookup api url.
*
* @return
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProvider.java
new file mode 100644
index 0000000..fbf6c19
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProvider.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Multi version generic schema provider by guava cache.
+ */
+public class MultiVersionGenericSchemaProvider implements SchemaProvider<GenericRecord> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MultiVersionGenericSchemaProvider.class);
+
+ private final TopicName topicName;
+ private final PulsarClientImpl pulsarClient;
+
+ private final LoadingCache<byte[], GenericSchema> cache = CacheBuilder.newBuilder().maximumSize(100000)
+ .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<byte[], GenericSchema>() {
+ @Override
+ public GenericSchema load(byte[] schemaVersion) throws Exception {
+ return loadSchema(schemaVersion);
+ }
+ });
+
+ public MultiVersionGenericSchemaProvider(TopicName topicName, PulsarClientImpl pulsarClient) {
+ this.topicName = topicName;
+ this.pulsarClient = pulsarClient;
+ }
+
+ @Override
+ public GenericSchema getSchema(byte[] schemaVersion) {
+ try {
+ if (null == schemaVersion) {
+ return null;
+ }
+ return cache.get(schemaVersion);
+ } catch (ExecutionException e) {
+ LOG.error("Can't get generic schema for topic {} schema version {}",
+ topicName.toString(), new String(schemaVersion, StandardCharsets.UTF_8), e);
+ return null;
+ }
+ }
+
+ private GenericSchema loadSchema(byte[] schemaVersion) throws ExecutionException, InterruptedException {
+ Optional<SchemaInfo> schemaInfo = pulsarClient.getLookup()
+ .getSchema(topicName, schemaVersion).get();
+ return schemaInfo.map(GenericSchemaImpl::of).orElse(null);
+ }
+
+ public TopicName getTopic() {
+ return topicName;
+ }
+
+ public PulsarClientImpl getPulsarClient() {
+ return pulsarClient;
+ }
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProviderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProviderTest.java
new file mode 100644
index 0000000..884a674
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProviderTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.SchemaTestUtils;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Unit test for {@link MultiVersionGenericSchemaProvider}.
+ */
+public class MultiVersionGenericSchemaProviderTest {
+
+ private MultiVersionGenericSchemaProvider schemaProvider;
+
+ @BeforeMethod
+ public void setup() {
+ PulsarClientImpl client = mock(PulsarClientImpl.class);
+ when(client.getLookup()).thenReturn(mock(LookupService.class));
+ schemaProvider = new MultiVersionGenericSchemaProvider(
+ TopicName.get("persistent://public/default/my-topic"), client);
+ }
+
+ @Test
+ public void testGetSchema() {
+ CompletableFuture<Optional<SchemaInfo>> completableFuture = new CompletableFuture<>();
+ SchemaInfo schemaInfo = AvroSchema.of(SchemaTestUtils.Foo.class).getSchemaInfo();
+ completableFuture.complete(Optional.of(schemaInfo));
+ when(schemaProvider.getPulsarClient().getLookup()
+ .getSchema(
+ any(TopicName.class),
+ any(byte[].class)))
+ .thenReturn(completableFuture);
+ GenericSchema schema = schemaProvider.getSchema(new byte[0]);
+ assertEquals(schema.getSchemaInfo(), schemaInfo);
+ }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/BytesSchemaVersion.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/BytesSchemaVersion.java
new file mode 100644
index 0000000..13e875f
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/BytesSchemaVersion.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+/**
+ * Bytes schema version
+ */
+public class BytesSchemaVersion implements SchemaVersion {
+
+ private final byte[] bytes;
+
+ private BytesSchemaVersion(byte[] bytes) {
+ this.bytes = bytes;
+ }
+
+ @Override
+ public byte[] bytes() {
+ return bytes;
+ }
+
+ public static BytesSchemaVersion of(byte[] bytes) {
+ return bytes != null ? new BytesSchemaVersion(bytes) : null;
+ }
+}