You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/07 23:07:34 UTC

[incubator-pulsar] branch master updated: Move functions to use V2 style namespaces for internal data keeping (#1742)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a2f8aa8  Move functions to use V2 style namespaces for internal data keeping (#1742)
a2f8aa8 is described below

commit a2f8aa86943de11da944250ceafd470f26421699
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Mon May 7 16:07:31 2018 -0700

    Move functions to use V2 style namespaces for internal data keeping (#1742)
    
    * Move functions to use V2 style namespaces for internal data keeping
    
    * Incorporate feedback
---
 conf/functions_worker.yml                                        | 3 ++-
 .../src/main/java/org/apache/pulsar/broker/PulsarService.java    | 9 +++++----
 .../src/main/java/org/apache/pulsar/functions/worker/Worker.java | 3 +++
 .../java/org/apache/pulsar/functions/worker/WorkerConfig.java    | 1 +
 4 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index ff9d231..663c4f7 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -23,7 +23,8 @@ workerPort: 6750
 functionMetadataTopicName: metadata
 functionMetadataSnapshotsTopicPath: snapshots
 clusterCoordinationTopicName: coordinate
-pulsarFunctionsNamespace: sample/standalone/functions
+pulsarFunctionsNamespace: public/functions
+pulsarFunctionsCluster: standalone
 pulsarServiceUrl: pulsar://localhost:6650
 pulsarWebServiceUrl: http://localhost:8080
 numFunctionPackageReplicas: 1
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 38d8e84..7b975f3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -30,9 +30,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -81,6 +79,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -852,7 +851,7 @@ public class PulsarService implements AutoCloseable {
                     .getWorkerConfig().getPulsarFunctionsNamespace();
             String[] a = functionWorkerService.get().getWorkerConfig().getPulsarFunctionsNamespace().split("/");
             String property = a[0];
-            String cluster = a[1];
+            String cluster = functionWorkerService.get().getWorkerConfig().getPulsarFunctionsCluster();
 
                 /*
                 multiple brokers may be trying to create the property, cluster, and namespace
@@ -904,6 +903,8 @@ public class PulsarService implements AutoCloseable {
             // create namespace for function worker service
             try {
                 Policies policies = new Policies();
+                policies.retention_policies = new RetentionPolicies(-1, -1);
+                policies.replication_clusters = Collections.singleton(functionWorkerService.get().getWorkerConfig().getPulsarFunctionsCluster());
                 int defaultNumberOfBundles = this.getConfiguration().getDefaultNumberOfNamespaceBundles();
                 policies.bundles = getBundles(defaultNumberOfBundles);
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index 469cdf0..4f700ab 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.functions.worker.rest.WorkerServer;
 import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.net.URI;
+import java.util.HashSet;
 
 @Slf4j
 public class Worker extends AbstractService {
@@ -104,6 +105,8 @@ public class Worker extends AbstractService {
                     try {
                         Policies policies = new Policies();
                         policies.retention_policies = new RetentionPolicies(-1, -1);
+                        policies.replication_clusters = new HashSet<>();
+                        policies.replication_clusters.add(workerConfig.getPulsarFunctionsCluster());
                         admin.namespaces().createNamespace(workerConfig.getPulsarFunctionsNamespace(),
                                 policies);
                     } catch (PulsarAdminException e1) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 06f4e53..161d210 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -51,6 +51,7 @@ public class WorkerConfig implements Serializable {
     private String clusterCoordinationTopicName;
     private String functionMetadataSnapshotsTopicPath;
     private String pulsarFunctionsNamespace;
+    private String pulsarFunctionsCluster;
     private int numFunctionPackageReplicas;
     private String downloadDirectory;
     private long snapshotFreqMs;

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.