You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2021/01/12 07:30:44 UTC

[pulsar] branch master updated: [issue 8337][Worker] Move initialize dlog namespace metadata to bin/pulsar (#8781)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new aca4cd7  [issue 8337][Worker] Move initialize dlog namespace metadata to bin/pulsar (#8781)
aca4cd7 is described below

commit aca4cd7414dfea1efc391f1bc6ab897d8f967e9d
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Tue Jan 12 15:30:09 2021 +0800

    [issue 8337][Worker] Move initialize dlog namespace metadata to bin/pulsar (#8781)
    
    Fixes #8337
    
    ### Motivation
    
    Currently, the starting function worker service will initialize the distributed log namespace. when initializing the distributed log namespace, the zookeeper will be connected. A better way is to implement a tool to initialize the distributed log namespace, which is similar to `./bin/pulsar initialize-cluster-metadata`.
    
    ### Modifications
    
    - Add init distributed log namespace metadata to `./bin/pulsar initialize-cluster-metadata`
    - Add flag to control whether the distributed log namespace is initialized in Function worker
    - Add flag to `conf/functions_worker.yml`
    
    ### Doc
    
    If you want to initialize distributed log metadata by `bin/pulsar`, you need to perform the following steps:
    1. Using `./bin/pulsar initialize-cluster-metadata` to initialize cluster
    2. Set initializedDlogMetadata to `true` in `functions_worker.yml`
    
    **Note**: All the changes are compatible.
---
 conf/functions_worker.yml                          |  4 +++
 .../apache/pulsar/PulsarClusterMetadataSetup.java  | 33 +++++++++++++++++-----
 .../pulsar/functions/worker/WorkerConfig.java      | 13 +++++++++
 .../functions/worker/PulsarWorkerService.java      | 24 ++++++++++++----
 .../pulsar/functions/worker/WorkerUtils.java       |  5 ++--
 5 files changed, 63 insertions(+), 16 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index e1f675d..c15e829 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -242,3 +242,7 @@ functionsDirectory: ./functions
 
 # Should connector config be validated during during submission
 validateConnectorConfig: false
+
+# Whether to initialize distributed log metadata by runtime.
+# If it is set to true, you must ensure that it has been initialized by "bin/pulsarinitialize-cluster-metadata" command.
+initializedDlogMetadata: false
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index 28aac2f..e090f3d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -32,6 +32,7 @@ import org.apache.bookkeeper.stream.storage.api.cluster.ClusterInitializer;
 import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.admin.ZkAdminPaths;
+import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -39,6 +40,7 @@ import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
@@ -141,6 +143,18 @@ public class PulsarClusterMetadataSetup {
         }
     }
 
+    private static void initialDlogNamespaceMetadata(String configurationStore, String bkMetadataServiceUri)
+            throws IOException {
+        InternalConfigurationData internalConf = new InternalConfigurationData(
+                configurationStore,
+                configurationStore,
+                null,
+                bkMetadataServiceUri,
+                null
+        );
+        WorkerUtils.initializeDlogNamespace(internalConf);
+    }
+
     public static void main(String[] args) throws Exception {
         Arguments arguments = new Arguments();
         JCommander jcommander = new JCommander();
@@ -195,15 +209,20 @@ public class PulsarClusterMetadataSetup {
             }
         }
 
+
+        String uriStr = bkConf.getMetadataServiceUri();
+        if (arguments.existingBkMetadataServiceUri != null) {
+            uriStr = arguments.existingBkMetadataServiceUri;
+        } else if (arguments.bookieMetadataServiceUri != null) {
+            uriStr = arguments.bookieMetadataServiceUri;
+        }
+        ServiceURI bkMetadataServiceUri = ServiceURI.create(uriStr);
+
+        // initial distributed log metadata
+        initialDlogNamespaceMetadata(arguments.configurationStore, uriStr);
+
         // Format BookKeeper stream storage metadata
         if (arguments.numStreamStorageContainers > 0) {
-            String uriStr = bkConf.getMetadataServiceUri();
-            if (arguments.existingBkMetadataServiceUri != null) {
-                uriStr = arguments.existingBkMetadataServiceUri;
-            } else if (arguments.bookieMetadataServiceUri != null) {
-                uriStr = arguments.bookieMetadataServiceUri;
-            }
-            ServiceURI bkMetadataServiceUri = ServiceURI.create(uriStr);
             ClusterInitializer initializer = new ZkClusterInitializer(arguments.zookeeper);
             initializer.initializeCluster(bkMetadataServiceUri.getUri(), arguments.numStreamStorageContainers);
         }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 77058ac..1bde75d 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -400,6 +400,19 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
     	return tlsEnabled || workerPortTls != null;
     }
 
+    @FieldContext(
+            category = CATEGORY_WORKER,
+            doc = "Whether to initialize distributed log metadata in runtime"
+    )
+    private Boolean initializedDlogMetadata = false;
+
+    public Boolean isInitializedDlogMetadata() {
+        if (this.initializedDlogMetadata == null){
+            return false;
+        }
+        return this.initializedDlogMetadata;
+    };
+
     /******** security settings for pulsar broker client **********/
 
     @FieldContext(
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
index 528954a..5dcd648 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
@@ -266,14 +266,21 @@ public class PulsarWorkerService implements WorkerService {
         }
 
         // initialize the dlog namespace
-        // TODO: move this as part of pulsar cluster initialization later
+        URI dlogURI;
         try {
-            return WorkerUtils.initializeDlogNamespace(internalConf);
+            if (workerConfig.isInitializedDlogMetadata()) {
+                dlogURI = WorkerUtils.newDlogNamespaceURI(internalConf.getZookeeperServers());
+            } else {
+                dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
+            }
         } catch (IOException ioe) {
-            log.error("Failed to initialize dlog namespace with zookeeper {} at metadata service uri {} for storing function packages",
-                internalConf.getZookeeperServers(), internalConf.getBookkeeperMetadataServiceUri(), ioe);
+            log.error("Failed to initialize dlog namespace with zookeeper {} at metadata service uri {} for storing " +
+                            "function packages", internalConf.getZookeeperServers(),
+                    internalConf.getBookkeeperMetadataServiceUri(), ioe);
             throw ioe;
         }
+
+        return dlogURI;
     }
 
     @Override
@@ -363,9 +370,14 @@ public class PulsarWorkerService implements WorkerService {
         URI dlogURI;
         try {
             // initializing dlog namespace for function worker
-            dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
+            if (workerConfig.isInitializedDlogMetadata()){
+                dlogURI = WorkerUtils.newDlogNamespaceURI(internalConf.getZookeeperServers());
+            } else {
+                dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
+            }
         } catch (IOException ioe) {
-            LOG.error("Failed to initialize dlog namespace with zookeeper {} at at metadata service uri {} for storing function packages",
+            LOG.error("Failed to initialize dlog namespace with zookeeper {} at at metadata service uri {} for " +
+                            "storing function packages",
                 internalConf.getZookeeperServers(), internalConf.getBookkeeperMetadataServiceUri(), ioe);
             throw ioe;
         }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 9361282..6ec4e85 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -155,8 +155,7 @@ public final class WorkerUtils {
         return conf;
     }
 
-    public static URI newDlogNamespaceURI(InternalConfigurationData internalConf) {
-        String zookeeperServers = internalConf.getZookeeperServers();
+    public static URI newDlogNamespaceURI(String zookeeperServers) {
         return URI.create(String.format("distributedlog://%s/pulsar/functions", zookeeperServers));
     }
 
@@ -176,7 +175,7 @@ public final class WorkerUtils {
         BKDLConfig dlConfig = new BKDLConfig(ledgersStoreServers, ledgersRootPath);
         DLMetadata dlMetadata = DLMetadata.create(dlConfig);
 
-        URI dlogUri = newDlogNamespaceURI(internalConf);
+        URI dlogUri = newDlogNamespaceURI(internalConf.getZookeeperServers());
         try {
             dlMetadata.create(dlogUri);
         } catch (ZKException e) {