You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2021/01/12 07:30:44 UTC
[pulsar] branch master updated: [issue 8337][Worker] Move
initialize dlog namespace metadata to bin/pulsar (#8781)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new aca4cd7 [issue 8337][Worker] Move initialize dlog namespace metadata to bin/pulsar (#8781)
aca4cd7 is described below
commit aca4cd7414dfea1efc391f1bc6ab897d8f967e9d
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Tue Jan 12 15:30:09 2021 +0800
[issue 8337][Worker] Move initialize dlog namespace metadata to bin/pulsar (#8781)
Fixes #8337
### Motivation
Currently, the starting function worker service will initialize the distributed log namespace. when initializing the distributed log namespace, the zookeeper will be connected. A better way is to implement a tool to initialize the distributed log namespace, which is similar to `./bin/pulsar initialize-cluster-metadata`.
### Modifications
- Add init distributed log namespace metadata to `./bin/pulsar initialize-cluster-metadata`
- Add flag to control whether the distributed log namespace is initialized in Function worker
- Add flag to `conf/functions_worker.yml`
### Doc
If you want to initialize distributed log metadata by `bin/pulsar`, you need to perform the following steps:
1. Using `./bin/pulsar initialize-cluster-metadata` to initialize cluster
2. Set initializedDlogMetadata to `true` in `functions_worker.yml`
**Note**: All the changes are compatible.
---
conf/functions_worker.yml | 4 +++
.../apache/pulsar/PulsarClusterMetadataSetup.java | 33 +++++++++++++++++-----
.../pulsar/functions/worker/WorkerConfig.java | 13 +++++++++
.../functions/worker/PulsarWorkerService.java | 24 ++++++++++++----
.../pulsar/functions/worker/WorkerUtils.java | 5 ++--
5 files changed, 63 insertions(+), 16 deletions(-)
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index e1f675d..c15e829 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -242,3 +242,7 @@ functionsDirectory: ./functions
# Should connector config be validated during during submission
validateConnectorConfig: false
+
+# Whether to initialize distributed log metadata by runtime.
+# If it is set to true, you must ensure that it has been initialized by "bin/pulsarinitialize-cluster-metadata" command.
+initializedDlogMetadata: false
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index 28aac2f..e090f3d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -32,6 +32,7 @@ import org.apache.bookkeeper.stream.storage.api.cluster.ClusterInitializer;
import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.admin.ZkAdminPaths;
+import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -39,6 +40,7 @@ import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
@@ -141,6 +143,18 @@ public class PulsarClusterMetadataSetup {
}
}
+ private static void initialDlogNamespaceMetadata(String configurationStore, String bkMetadataServiceUri)
+ throws IOException {
+ InternalConfigurationData internalConf = new InternalConfigurationData(
+ configurationStore,
+ configurationStore,
+ null,
+ bkMetadataServiceUri,
+ null
+ );
+ WorkerUtils.initializeDlogNamespace(internalConf);
+ }
+
public static void main(String[] args) throws Exception {
Arguments arguments = new Arguments();
JCommander jcommander = new JCommander();
@@ -195,15 +209,20 @@ public class PulsarClusterMetadataSetup {
}
}
+
+ String uriStr = bkConf.getMetadataServiceUri();
+ if (arguments.existingBkMetadataServiceUri != null) {
+ uriStr = arguments.existingBkMetadataServiceUri;
+ } else if (arguments.bookieMetadataServiceUri != null) {
+ uriStr = arguments.bookieMetadataServiceUri;
+ }
+ ServiceURI bkMetadataServiceUri = ServiceURI.create(uriStr);
+
+ // initial distributed log metadata
+ initialDlogNamespaceMetadata(arguments.configurationStore, uriStr);
+
// Format BookKeeper stream storage metadata
if (arguments.numStreamStorageContainers > 0) {
- String uriStr = bkConf.getMetadataServiceUri();
- if (arguments.existingBkMetadataServiceUri != null) {
- uriStr = arguments.existingBkMetadataServiceUri;
- } else if (arguments.bookieMetadataServiceUri != null) {
- uriStr = arguments.bookieMetadataServiceUri;
- }
- ServiceURI bkMetadataServiceUri = ServiceURI.create(uriStr);
ClusterInitializer initializer = new ZkClusterInitializer(arguments.zookeeper);
initializer.initializeCluster(bkMetadataServiceUri.getUri(), arguments.numStreamStorageContainers);
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 77058ac..1bde75d 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -400,6 +400,19 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
return tlsEnabled || workerPortTls != null;
}
+ @FieldContext(
+ category = CATEGORY_WORKER,
+ doc = "Whether to initialize distributed log metadata in runtime"
+ )
+ private Boolean initializedDlogMetadata = false;
+
+ public Boolean isInitializedDlogMetadata() {
+ if (this.initializedDlogMetadata == null){
+ return false;
+ }
+ return this.initializedDlogMetadata;
+ };
+
/******** security settings for pulsar broker client **********/
@FieldContext(
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
index 528954a..5dcd648 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
@@ -266,14 +266,21 @@ public class PulsarWorkerService implements WorkerService {
}
// initialize the dlog namespace
- // TODO: move this as part of pulsar cluster initialization later
+ URI dlogURI;
try {
- return WorkerUtils.initializeDlogNamespace(internalConf);
+ if (workerConfig.isInitializedDlogMetadata()) {
+ dlogURI = WorkerUtils.newDlogNamespaceURI(internalConf.getZookeeperServers());
+ } else {
+ dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
+ }
} catch (IOException ioe) {
- log.error("Failed to initialize dlog namespace with zookeeper {} at metadata service uri {} for storing function packages",
- internalConf.getZookeeperServers(), internalConf.getBookkeeperMetadataServiceUri(), ioe);
+ log.error("Failed to initialize dlog namespace with zookeeper {} at metadata service uri {} for storing " +
+ "function packages", internalConf.getZookeeperServers(),
+ internalConf.getBookkeeperMetadataServiceUri(), ioe);
throw ioe;
}
+
+ return dlogURI;
}
@Override
@@ -363,9 +370,14 @@ public class PulsarWorkerService implements WorkerService {
URI dlogURI;
try {
// initializing dlog namespace for function worker
- dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
+ if (workerConfig.isInitializedDlogMetadata()){
+ dlogURI = WorkerUtils.newDlogNamespaceURI(internalConf.getZookeeperServers());
+ } else {
+ dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
+ }
} catch (IOException ioe) {
- LOG.error("Failed to initialize dlog namespace with zookeeper {} at at metadata service uri {} for storing function packages",
+ LOG.error("Failed to initialize dlog namespace with zookeeper {} at at metadata service uri {} for " +
+ "storing function packages",
internalConf.getZookeeperServers(), internalConf.getBookkeeperMetadataServiceUri(), ioe);
throw ioe;
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 9361282..6ec4e85 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -155,8 +155,7 @@ public final class WorkerUtils {
return conf;
}
- public static URI newDlogNamespaceURI(InternalConfigurationData internalConf) {
- String zookeeperServers = internalConf.getZookeeperServers();
+ public static URI newDlogNamespaceURI(String zookeeperServers) {
return URI.create(String.format("distributedlog://%s/pulsar/functions", zookeeperServers));
}
@@ -176,7 +175,7 @@ public final class WorkerUtils {
BKDLConfig dlConfig = new BKDLConfig(ledgersStoreServers, ledgersRootPath);
DLMetadata dlMetadata = DLMetadata.create(dlConfig);
- URI dlogUri = newDlogNamespaceURI(internalConf);
+ URI dlogUri = newDlogNamespaceURI(internalConf.getZookeeperServers());
try {
dlMetadata.create(dlogUri);
} catch (ZKException e) {