You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:13 UTC

[pulsar] 27/38: Make SchemaStorage accessible in Offloader (#6567)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d8b6e3fe6075417650bd0620f655db9a8cb37a99
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Sat Mar 21 01:15:00 2020 +0800

    Make SchemaStorage accessible in Offloader (#6567)
    
    While offloading ledgers from bookies to 2nd storage, we could offload the ledgers in columnar format. Columnar data could accelerate analytical workloads' execution by skipping unnecessary columns or data blocks (also known as column pruning and filter push down in analytical systems).
    
    The only blocker in Pulsar side is that offloaders cannot get the schema of the ledgers, this PR makes the schema storage accessible from offloaders.
    (cherry picked from commit 322347714301371b2e8df7b107c5fd52bb7b17fa)
---
 .../bookkeeper/mledger/LedgerOffloaderFactory.java | 18 ++++++++++++
 .../org/apache/pulsar/broker/PulsarService.java    | 23 +++++++++++++++-
 .../service/schema/BookkeeperSchemaStorage.java    |  2 ++
 .../schema/BookkeeperSchemaStorageFactory.java     |  1 +
 .../service/schema/SchemaRegistryService.java      | 32 ++++++++--------------
 .../service/schema/SchemaRegistryServiceImpl.java  |  1 +
 .../service/schema/SchemaStorageFactory.java       |  1 +
 .../org/apache/pulsar/client/impl/MessageImpl.java |  2 +-
 .../pulsar/common/api/raw/MessageParser.java       |  2 +-
 .../pulsar/common/api/raw/RawMessageIdImpl.java    |  3 ++
 .../common/protocol}/schema/SchemaStorage.java     |  6 ++--
 .../common/protocol}/schema/StoredSchema.java      | 12 ++++----
 12 files changed, 72 insertions(+), 31 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
index 7a0e6dc..bffffa5 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
@@ -25,6 +25,7 @@ import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.protocol.schema.SchemaStorage;
 
 /**
  * Factory to create {@link LedgerOffloader} to offload ledgers into long-term storage.
@@ -55,4 +56,21 @@ public interface LedgerOffloaderFactory<T extends LedgerOffloader> {
              OrderedScheduler scheduler)
         throws IOException;
 
+    /**
+     * Create a ledger offloader with the provided configuration, user-metadata, schema storage and scheduler.
+     *
+     * @param offloadPolicies offload policies
+     * @param userMetadata user metadata
+     * @param schemaStorage used for schema lookup in offloader
+     * @param scheduler scheduler
+     * @return the offloader instance
+     * @throws IOException when fail to create an offloader
+     */
+    default T create(OffloadPolicies offloadPolicies,
+             Map<String, String> userMetadata,
+             SchemaStorage schemaStorage,
+             OrderedScheduler scheduler)
+            throws IOException {
+        return create(offloadPolicies, userMetadata, scheduler);
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 5236efa..eeb3b1a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -33,6 +33,7 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.Collections;
@@ -104,6 +105,7 @@ import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.schema.SchemaStorage;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.compaction.Compactor;
@@ -180,6 +182,7 @@ public class PulsarService implements AutoCloseable {
     private String brokerServiceUrl;
     private String brokerServiceUrlTls;
     private final String brokerVersion;
+    private SchemaStorage schemaStorage = null;
     private SchemaRegistryService schemaRegistryService = null;
     private final Optional<WorkerService> functionWorkerService;
     private ProtocolHandlers protocolHandlers = null;
@@ -403,7 +406,10 @@ public class PulsarService implements AutoCloseable {
 
             // needs load management service and before start broker service,
             this.startNamespaceService();
-            schemaRegistryService = SchemaRegistryService.create(this);
+
+            schemaStorage = createAndStartSchemaStorage();
+            schemaRegistryService = SchemaRegistryService.create(
+                    schemaStorage, config.getSchemaRegistryCompatibilityCheckers());
 
             this.defaultOffloader = createManagedLedgerOffloader(
                     OffloadPolicies.create(this.getConfiguration().getProperties()));
@@ -817,6 +823,7 @@ public class PulsarService implements AutoCloseable {
                             LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
                             LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
                         ),
+                        schemaStorage,
                         getOffloaderScheduler(offloadPolicies));
                 } catch (IOException ioe) {
                     throw new PulsarServerException(ioe.getMessage(), ioe.getCause());
@@ -830,6 +837,20 @@ public class PulsarService implements AutoCloseable {
         }
     }
 
+    private SchemaStorage createAndStartSchemaStorage() {
+        SchemaStorage schemaStorage = null;
+        try {
+            final Class<?> storageClass = Class.forName(config.getSchemaRegistryStorageClassName());
+            Object factoryInstance = storageClass.newInstance();
+            Method createMethod = storageClass.getMethod("create", PulsarService.class);
+            schemaStorage = (SchemaStorage) createMethod.invoke(factoryInstance, this);
+            schemaStorage.start();
+        } catch (Exception e) {
+            LOG.warn("Unable to create schema registry storage");
+        }
+        return schemaStorage;
+    }
+
     public ZooKeeperCache getLocalZkCache() {
         return localZkCache;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 02e48c8..df41e71 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -50,7 +50,9 @@ import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.protocol.schema.SchemaStorage;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.protocol.schema.StoredSchema;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java
index 4b25374..8304ed1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service.schema;
 
 import javax.validation.constraints.NotNull;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.common.protocol.schema.SchemaStorage;
 
 @SuppressWarnings("unused")
 public class BookkeeperSchemaStorageFactory implements SchemaStorageFactory {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
index 321ef7c..07d37e1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
@@ -25,12 +25,12 @@ import java.util.Set;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.schema.validator.SchemaRegistryServiceWithSchemaDataValidator;
+import org.apache.pulsar.common.protocol.schema.SchemaStorage;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public interface SchemaRegistryService extends SchemaRegistry {
-    String CreateMethodName = "create";
     Logger log = LoggerFactory.getLogger(SchemaRegistryService.class);
     long NO_SCHEMA_VERSION = -1L;
 
@@ -44,26 +44,16 @@ public interface SchemaRegistryService extends SchemaRegistry {
         return checkers;
     }
 
-    static SchemaRegistryService create(PulsarService pulsar) {
-        try {
-            ServiceConfiguration config = pulsar.getConfiguration();
-            final Class<?> storageClass = Class.forName(config.getSchemaRegistryStorageClassName());
-            Object factoryInstance = storageClass.newInstance();
-            Method createMethod = storageClass.getMethod(CreateMethodName, PulsarService.class);
-
-            SchemaStorage schemaStorage = (SchemaStorage) createMethod.invoke(factoryInstance, pulsar);
-
-            Map<SchemaType, SchemaCompatibilityCheck> checkers =
-                getCheckers(config.getSchemaRegistryCompatibilityCheckers());
-
-            checkers.put(SchemaType.KEY_VALUE, new KeyValueSchemaCompatibilityCheck(checkers));
-
-            schemaStorage.start();
-
-            return SchemaRegistryServiceWithSchemaDataValidator.of(
-                new SchemaRegistryServiceImpl(schemaStorage, checkers));
-        } catch (Exception e) {
-            log.warn("Unable to create schema registry storage, defaulting to empty storage", e);
+    static SchemaRegistryService create(SchemaStorage schemaStorage, Set<String> schemaRegistryCompatibilityCheckers) {
+        if (schemaStorage != null) {
+            try {
+                Map<SchemaType, SchemaCompatibilityCheck> checkers = getCheckers(schemaRegistryCompatibilityCheckers);
+                checkers.put(SchemaType.KEY_VALUE, new KeyValueSchemaCompatibilityCheck(checkers));
+                return SchemaRegistryServiceWithSchemaDataValidator.of(
+                        new SchemaRegistryServiceImpl(schemaStorage, checkers));
+            } catch (Exception e) {
+                log.warn("Unable to create schema registry storage, defaulting to empty storage", e);
+            }
         }
         return new DefaultSchemaRegistryService();
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 4211882..7307e4c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.protocol.schema.SchemaHash;
+import org.apache.pulsar.common.protocol.schema.SchemaStorage;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.util.FutureUtil;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorageFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorageFactory.java
index c4cff34..483e310 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorageFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorageFactory.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service.schema;
 
 import javax.validation.constraints.NotNull;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.common.protocol.schema.SchemaStorage;
 
 public interface SchemaStorageFactory {
     @NotNull
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index f11ae77..0b1f397 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -65,7 +65,7 @@ public class MessageImpl<T> implements Message<T> {
     private final int redeliveryCount;
 
     // Constructor for out-going message
-    static <T> MessageImpl<T> create(MessageMetadata.Builder msgMetadataBuilder, ByteBuffer payload, Schema<T> schema) {
+    public static <T> MessageImpl<T> create(MessageMetadata.Builder msgMetadataBuilder, ByteBuffer payload, Schema<T> schema) {
         @SuppressWarnings("unchecked")
         MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
         msg.msgMetadataBuilder = msgMetadataBuilder;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
index b791a80..aeb60ef 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
@@ -45,7 +45,7 @@ public class MessageParser {
      * Definition of an interface to process a raw Pulsar entry payload.
      */
     public interface MessageProcessor {
-        void process(RawMessage message);
+        void process(RawMessage message) throws IOException;
     }
 
     /**
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageIdImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageIdImpl.java
index d388cc7..f48cd4c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageIdImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageIdImpl.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.common.api.raw;
 
+import lombok.Getter;
+
+@Getter
 public class RawMessageIdImpl implements RawMessageId {
 
     long ledgerId;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java
similarity index 92%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
rename to pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java
index f133666..9f007aa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java
@@ -16,12 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.service.schema;
+package org.apache.pulsar.common.protocol.schema;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 
+/**
+ * Schema storage.
+ */
 public interface SchemaStorage {
 
     CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/StoredSchema.java
similarity index 86%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java
rename to pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/StoredSchema.java
index b1866ec..7952b4e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/StoredSchema.java
@@ -16,18 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.service.schema;
+package org.apache.pulsar.common.protocol.schema;
 
 import com.google.common.base.MoreObjects;
 import java.util.Arrays;
 import java.util.Objects;
-import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 
+/**
+ * Stored schema with version.
+ */
 public class StoredSchema {
     public final byte[] data;
     public final SchemaVersion version;
 
-    StoredSchema(byte[] data, SchemaVersion version) {
+    public StoredSchema(byte[] data, SchemaVersion version) {
         this.data = data;
         this.version = version;
     }
@@ -41,8 +43,8 @@ public class StoredSchema {
             return false;
         }
         StoredSchema that = (StoredSchema) o;
-        return Arrays.equals(data, that.data) &&
-            Objects.equals(version, that.version);
+        return Arrays.equals(data, that.data)
+                && Objects.equals(version, that.version);
     }
 
     @Override