You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/07 23:07:34 UTC
[incubator-pulsar] branch master updated: Move functions to use V2
style namespaces for internal data keeping (#1742)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a2f8aa8 Move functions to use V2 style namespaces for internal data keeping (#1742)
a2f8aa8 is described below
commit a2f8aa86943de11da944250ceafd470f26421699
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Mon May 7 16:07:31 2018 -0700
Move functions to use V2 style namespaces for internal data keeping (#1742)
* Move functions to use V2 style namespaces for internal data keeping
* Incorporate feedback
---
conf/functions_worker.yml | 3 ++-
.../src/main/java/org/apache/pulsar/broker/PulsarService.java | 9 +++++----
.../src/main/java/org/apache/pulsar/functions/worker/Worker.java | 3 +++
.../java/org/apache/pulsar/functions/worker/WorkerConfig.java | 1 +
4 files changed, 11 insertions(+), 5 deletions(-)
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index ff9d231..663c4f7 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -23,7 +23,8 @@ workerPort: 6750
functionMetadataTopicName: metadata
functionMetadataSnapshotsTopicPath: snapshots
clusterCoordinationTopicName: coordinate
-pulsarFunctionsNamespace: sample/standalone/functions
+pulsarFunctionsNamespace: public/functions
+pulsarFunctionsCluster: standalone
pulsarServiceUrl: pulsar://localhost:6650
pulsarWebServiceUrl: http://localhost:8080
numFunctionPackageReplicas: 1
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 38d8e84..7b975f3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -30,9 +30,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.URI;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -81,6 +79,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -852,7 +851,7 @@ public class PulsarService implements AutoCloseable {
.getWorkerConfig().getPulsarFunctionsNamespace();
String[] a = functionWorkerService.get().getWorkerConfig().getPulsarFunctionsNamespace().split("/");
String property = a[0];
- String cluster = a[1];
+ String cluster = functionWorkerService.get().getWorkerConfig().getPulsarFunctionsCluster();
/*
multiple brokers may be trying to create the property, cluster, and namespace
@@ -904,6 +903,8 @@ public class PulsarService implements AutoCloseable {
// create namespace for function worker service
try {
Policies policies = new Policies();
+ policies.retention_policies = new RetentionPolicies(-1, -1);
+ policies.replication_clusters = Collections.singleton(functionWorkerService.get().getWorkerConfig().getPulsarFunctionsCluster());
int defaultNumberOfBundles = this.getConfiguration().getDefaultNumberOfNamespaceBundles();
policies.bundles = getBundles(defaultNumberOfBundles);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index 469cdf0..4f700ab 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.functions.worker.rest.WorkerServer;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.net.URI;
+import java.util.HashSet;
@Slf4j
public class Worker extends AbstractService {
@@ -104,6 +105,8 @@ public class Worker extends AbstractService {
try {
Policies policies = new Policies();
policies.retention_policies = new RetentionPolicies(-1, -1);
+ policies.replication_clusters = new HashSet<>();
+ policies.replication_clusters.add(workerConfig.getPulsarFunctionsCluster());
admin.namespaces().createNamespace(workerConfig.getPulsarFunctionsNamespace(),
policies);
} catch (PulsarAdminException e1) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 06f4e53..161d210 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -51,6 +51,7 @@ public class WorkerConfig implements Serializable {
private String clusterCoordinationTopicName;
private String functionMetadataSnapshotsTopicPath;
private String pulsarFunctionsNamespace;
+ private String pulsarFunctionsCluster;
private int numFunctionPackageReplicas;
private String downloadDirectory;
private long snapshotFreqMs;
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.