You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/05 21:49:52 UTC
[incubator-pulsar] branch master updated: Support running function
worker along with broker (#1329)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b701925 Support running function worker along with broker (#1329)
b701925 is described below
commit b7019252d832c42fb244c07fc92ce4f59801c952
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Mon Mar 5 13:49:50 2018 -0800
Support running function worker along with broker (#1329)
* Move pulsar functions dependency version to root pom and remove duplicated license headers
This addresses some comments in pulsar functions PR #1314
* shade worker
* Fix broken master
* Upgrade the bookkeeper storage client dependency to the official bookkeeper version
This removes the temp dependency in `pulsar-functions-instance`
* set `protobuf2.version` in pulsar-common
* provide a shaded worker
* include worker dependency at broker
* Embeded function worker at broker
* rename 'function worker' to 'functions worker'
* add "--no-functions-worker" for pulsar-client-cpp tests
---
bin/pulsar | 59 ++-
buildtools/pom.xml | 15 +
conf/broker.conf | 6 +
conf/functions_worker.yml | 46 +++
conf/log4j2.yaml | 118 +++++-
managed-ledger/pom.xml | 15 -
pom.xml | 38 +-
.../apache/pulsar/broker/ServiceConfiguration.java | 13 +
pulsar-broker/pom.xml | 90 +++++
.../org/apache/pulsar/PulsarBrokerStarter.java | 55 ++-
.../org/apache/pulsar/PulsarStandaloneStarter.java | 42 +-
.../org/apache/pulsar/broker/PulsarService.java | 12 +
.../pulsar/broker/admin/impl/FunctionsBase.java | 142 +++++++
.../apache/pulsar/broker/admin/v1/Functions.java | 27 +-
.../apache/pulsar/broker/admin/v2/Functions.java | 27 +-
.../org/apache/pulsar/broker/web/WebService.java | 2 +
pulsar-client-cpp/run-unit-tests.sh | 3 +-
.../pom.xml | 86 ++--
.../pulsar/admin/cli/PulsarAdminToolTest.java | 0
.../apache/pulsar/admin/cli/utils/IOUtilsTest.java | 0
.../cli/utils/NameValueParameterSplitterTest.java | 0
.../pulsar/client/cli/PulsarClientToolTest.java | 0
pulsar-client-tools/pom.xml | 133 +++----
pulsar-common/pom.xml | 2 +-
pulsar-functions/bin/pulsar | 6 +-
pulsar-functions/cli/pom.xml | 14 -
pulsar-functions/instance/pom.xml | 1 -
pulsar-functions/pom.xml | 2 +
pulsar-functions/runtime/pom.xml | 5 +
pulsar-functions/utils/pom.xml | 14 -
pulsar-functions/worker-runner/pom.xml | 111 ++++++
pulsar-functions/worker-shaded/pom.xml | 113 ++++++
pulsar-functions/worker/pom.xml | 14 -
.../pulsar/functions/worker/MembershipManager.java | 2 +-
.../org/apache/pulsar/functions/worker/Worker.java | 11 +-
.../pulsar/functions/worker/WorkerService.java | 12 +-
.../functions/worker/rest/FunctionApiResource.java | 14 +-
...nctionApiV2Resource.java => FunctionsImpl.java} | 110 +++---
.../worker/rest/api/v2/FunctionApiV2Resource.java | 440 +--------------------
.../rest/api/v2/FunctionApiV2ResourceTest.java | 7 +-
pulsar-zookeeper-utils/pom.xml | 15 -
41 files changed, 1040 insertions(+), 782 deletions(-)
diff --git a/bin/pulsar b/bin/pulsar
index ed7e988..cafc359 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -31,6 +31,14 @@ DEFAULT_STANDALONE_CONF=$PULSAR_HOME/conf/standalone.conf
DEFAULT_WEBSOCKET_CONF=$PULSAR_HOME/conf/websocket.conf
DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2.yaml
+# functions related variables
+FUNCTIONS_HOME=$PULSAR_HOME/pulsar-functions
+DEFAULT_WORKER_CONF=$PULSAR_HOME/conf/functions_worker.yml
+DEFAULT_JAVA_INSTANCE_JAR=$PULSAR_HOME/instances/java-instance.jar
+JAVA_INSTANCE_JAR=${PULSAR_JAVA_INSTANCE_JAR:-"${DEFAULT_JAVA_INSTANCE_JAR}"}
+DEFAULT_PY_INSTANCE_FILE=$PULSAR_HOME/instances/python-instance/python_instance_main.py
+PY_INSTANCE_FILE=${PULSAR_PY_INSTANCE_FILE:-"${DEFAULT_PY_INSTANCE_FILE}"}
+
if [ -f "$PULSAR_HOME/conf/pulsar_env.sh" ]
then
. "$PULSAR_HOME/conf/pulsar_env.sh"
@@ -63,6 +71,34 @@ elif [ -e "$BUILT_JAR" ]; then
PULSAR_JAR=$BUILT_JAR
fi
+#
+# find the instance locations for pulsar-functions
+#
+
+# find the java instance location
+if [ ! -f "${JAVA_INSTANCE_JAR}" ]; then
+ # didn't find a released jar, then search the built jar
+ BUILT_JAVA_INSTANCE_JAR="${FUNCTIONS_HOME}/runtime/target/java-instance.jar"
+ if [ -z "${BUILT_JAVA_INSTANCE_JAR}" ]; then
+ echo "\nCouldn't find pulsar-functions java instance jar.";
+ echo "Make sure you've run 'mvn package'\n";
+ exit 1;
+ fi
+ JAVA_INSTANCE_JAR=${BUILT_JAVA_INSTANCE_JAR}
+fi
+
+# find the python instance location
+if [ ! -f "${PY_INSTANCE_FILE}" ]; then
+ # didn't find a released python instance, then search the built python instance
+ BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/runtime/target/python-instance/python_instance_main.py"
+ if [ -z "${BUILT_PY_INSTANCE_FILE}" ]; then
+ echo "\nCouldn't find pulsar-functions python instance.";
+ echo "Make sure you've run 'mvn package'\n";
+ exit 1;
+ fi
+ PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE}
+fi
+
pulsar_help() {
cat <<EOF
Usage: pulsar <command>
@@ -74,6 +110,7 @@ where command is one of:
discovery Run a discovery server
proxy Run a pulsar proxy
websocket Run a web socket proxy server
+ functions-worker Run a functions worker server
standalone Run a broker server with local bookies and local zookeeper
compact-topic Run compaction against a topic
@@ -93,6 +130,7 @@ Environment variables:
PULSAR_DISCOVERY_CONF Configuration file for discovery service (default: $DEFAULT_DISCOVERY_CONF)
PULSAR_WEBSOCKET_CONF Configuration file for websocket proxy (default: $DEFAULT_WEBSOCKET_CONF)
PULSAR_PROXY_CONF Configuration file for Pulsar proxy (default: $DEFAULT_PROXY_CONF)
+ PULSAR_WORKER_CONF Configuration file for functions worker (default: $DEFAULT_WORKER_CONF)
PULSAR_STANDALONE_CONF Configuration file for standalone (default: $DEFAULT_STANDALONE_CONF)
PULSAR_EXTRA_OPTS Extra options to be passed to the jvm
PULSAR_EXTRA_CLASSPATH Add extra paths to the pulsar classpath
@@ -115,7 +153,7 @@ add_maven_deps_to_classpath() {
f="${PULSAR_HOME}/all/target/classpath.txt"
if [ ! -f "${f}" ]
then
- ${MVN} -f "${PULSAR_HOME}/pom.xml" dependency:build-classpath -Dmdep.outputFile="${f}" &> /dev/null
+ ${MVN} -f "${PULSAR_HOME}/pom.xml" dependency:build-classpath -DincludeScope=compile -Dmdep.outputFile="${f}" &> /dev/null
fi
PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"`
}
@@ -136,6 +174,10 @@ fi
COMMAND=$1
shift
+if [ -z "$PULSAR_WORKER_CONF" ]; then
+ PULSAR_WORKER_CONF=$DEFAULT_WORKER_CONF
+fi
+
if [ -z "$PULSAR_BROKER_CONF" ]; then
PULSAR_BROKER_CONF=$DEFAULT_BROKER_CONF
fi
@@ -186,12 +228,22 @@ OPTS="-cp $PULSAR_CLASSPATH $OPTS"
OPTS="$OPTS $PULSAR_EXTRA_OPTS"
# log directory & file
-PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"Console"}
PULSAR_LOG_DIR=${PULSAR_LOG_DIR:-"$PULSAR_HOME/logs"}
+PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RoutingAppender"}
+PULSAR_LOG_LEVEL=${PULSAR_LOG_LEVEL:-"info"}
+PULSAR_ROUTING_APPENDER_DEFAULT=${PULSAR_ROUTING_APPENDER_DEFAULT:-"Console"}
#Configure log configuration system properties
OPTS="$OPTS -Dpulsar.log.appender=$PULSAR_LOG_APPENDER"
OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR"
+OPTS="$OPTS -Dpulsar.log.level=$PULSAR_LOG_LEVEL"
+OPTS="$OPTS -Dpulsar.routing.appender.default=$PULSAR_ROUTING_APPENDER_DEFAULT"
+
+# Functions related logging
+OPTS="$OPTS -Dpulsar.functions.process.container.log.dir=$PULSAR_LOG_DIR"
+# instance
+OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
+OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"
#Change to PULSAR_HOME to support relative paths
cd "$PULSAR_HOME"
@@ -218,6 +270,9 @@ elif [ $COMMAND == "proxy" ]; then
elif [ $COMMAND == "websocket" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-websocket.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.websocket.service.WebSocketServiceStarter $PULSAR_WEBSOCKET_CONF $@
+elif [ $COMMAND == "functions-worker" ]; then
+ PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-functions-worker.log"}
+ exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.functions.worker.FunctionWorkerStarter -c $PULSAR_WORKER_CONF $@
elif [ $COMMAND == "standalone" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-standalone.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarStandaloneStarter --config $PULSAR_STANDALONE_CONF $@
diff --git a/buildtools/pom.xml b/buildtools/pom.xml
index 388f043..d83bb2c 100644
--- a/buildtools/pom.xml
+++ b/buildtools/pom.xml
@@ -40,6 +40,21 @@
<artifactId>testng</artifactId>
<version>6.13.1</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <version>2.10.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>2.10.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <version>2.10.0</version>
+ </dependency>
</dependencies>
<build>
diff --git a/conf/broker.conf b/conf/broker.conf
index 93489e5..55e6f19 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -440,3 +440,9 @@ webSocketConnectionsPerBroker=8
# Enable topic level metrics
exposeTopicLevelMetricsInPrometheus=true
+
+### --- Functions --- ###
+
+# Enable Functions Worker Service in Broker
+functionsWorkerEnabled=false
+
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
new file mode 100644
index 0000000..c2c4091
--- /dev/null
+++ b/conf/functions_worker.yml
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+workerId: standalone
+workerHostname: localhost
+workerPort: 6750
+functionMetadataTopicName: metadata
+functionMetadataSnapshotsTopicPath: snapshots
+clusterCoordinationTopicName: coordinate
+pulsarFunctionsNamespace: sample/standalone/functions
+pulsarServiceUrl: pulsar://localhost:6650
+pulsarWebServiceUrl: http://localhost:8080
+numFunctionPackageReplicas: 1
+downloadDirectory: /tmp/pulsar_functions
+metricsConfig:
+ metricsSinkClassName: org.apache.pulsar.functions.metrics.sink.PrometheusSink
+ metricsCollectionInterval: 30
+ metricsSinkConfig:
+ path: /metrics
+ port: 9099
+#threadContainerFactory:
+# threadGroupName: "Thread Function Container Group"
+processContainerFactory:
+ logDirectory:
+
+schedulerClassName: "org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler"
+functionAssignmentTopicName: "assignments"
+failureCheckFreqMs: 30000
+rescheduleTimeoutMs: 60000
+initialBrokerReconnectMaxRetries: 60
diff --git a/conf/log4j2.yaml b/conf/log4j2.yaml
index 13b07ad..cd0ec88 100644
--- a/conf/log4j2.yaml
+++ b/conf/log4j2.yaml
@@ -23,6 +23,21 @@ Configuration:
monitorInterval: 30
name: pulsar
+ Properties:
+ Property:
+ - name: "pulsar.log.dir"
+ value: "logs"
+ - name: "pulsar.log.file"
+ value: "pulsar.log"
+ - name: "pulsar.log.appender"
+ value: "RoutingAppender"
+ - name: "pulsar.log.level"
+ value: "info"
+ - name: "pulsar.routing.appender.default"
+ value: "Console"
+ - name: "bk.log.level"
+ value: "info"
+
# Example: logger-filter script
Scripts:
ScriptFile:
@@ -32,19 +47,19 @@ Configuration:
charset: UTF-8
Appenders:
-
- # Console
+
+ # Console
Console:
name: Console
target: SYSTEM_OUT
PatternLayout:
Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
-
- # Rolling file appender configuration
+
+ # Rolling file appender configuration
RollingFile:
name: RollingFile
fileName: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}"
- filePattern: "/${sys:pulsar.log.file}-%d{MM-dd-yyyy}-%i.log.gz"
+ filePattern: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}-%d{MM-dd-yyyy}-%i.log.gz"
immediateFlush: false
PatternLayout:
Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
@@ -54,7 +69,7 @@ Configuration:
modulate: true
SizeBasedTriggeringPolicy:
size: 1 GB
- # Trigger every day at midnight that also scan
+ # Trigger every day at midnight that also scan
# roll-over strategy that deletes older file
CronTriggeringPolicy:
schedule: "0 0 0 * * ?"
@@ -67,16 +82,101 @@ Configuration:
glob: "*/${sys:pulsar.log.file}*log.gz"
IfLastModified:
age: 30d
-
+
+ # Rolling file appender configuration for bk
+ RollingRandomAccessFile:
+ name: BkRollingFile
+ fileName: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}.bk"
+ filePattern: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}.bk-%d{MM-dd-yyyy}-%i.log.gz"
+ immediateFlush: true
+ PatternLayout:
+ Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
+ Policies:
+ TimeBasedTriggeringPolicy:
+ interval: 1
+ modulate: true
+ SizeBasedTriggeringPolicy:
+ size: 1 GB
+ # Trigger every day at midnight that also scan
+ # roll-over strategy that deletes older file
+ CronTriggeringPolicy:
+ schedule: "0 0 0 * * ?"
+ # Delete file older than 30days
+ DefaultRolloverStrategy:
+ Delete:
+ basePath: ${sys:pulsar.log.dir}
+ maxDepth: 2
+ IfFileName:
+ glob: "*/${sys:pulsar.log.file}.bk*log.gz"
+ IfLastModified:
+ age: 30d
+
+ # Routing
+ Routing:
+ name: RoutingAppender
+ Routes:
+ pattern: "$${ctx:function}"
+ Route:
+ -
+ Routing:
+ name: InstanceRoutingAppender
+ Routes:
+ pattern: "$${ctx:instance}"
+ Route:
+ -
+ RollingFile:
+ name: "Rolling-${ctx:function}"
+ fileName : "${sys:pulsar.log.dir}/functions/${ctx:function}/function.log"
+ filePattern : "${sys:pulsar.log.dir}/functions/${ctx:function}-%d{MM-dd-yyyy}-%i.log.gz"
+ PatternLayout:
+ Pattern: "%d{ABSOLUTE} %level{length=5} [%thread] [instance: %X{instance}] %logger{1} - %msg%n"
+ Policies:
+ TimeBasedTriggeringPolicy:
+ interval: 1
+ modulate: true
+ SizeBasedTriggeringPolicy:
+ size: "20MB"
+ # Trigger every day at midnight that also scan
+ # roll-over strategy that deletes older file
+ CronTriggeringPolicy:
+ schedule: "0 0 0 * * ?"
+ # Delete file older than 30days
+ DefaultRolloverStrategy:
+ Delete:
+ basePath: ${sys:pulsar.log.dir}
+ maxDepth: 2
+ IfFileName:
+ glob: "*/${sys:pulsar.log.file}*log.gz"
+ IfLastModified:
+ age: 30d
+ - ref: "${sys:pulsar.routing.appender.default}"
+ key: "${ctx:function}"
+ - ref: "${sys:pulsar.routing.appender.default}"
+ key: "${ctx:function}"
Loggers:
+
+ Logger:
+ name: org.apache.bookkeeper
+ level: "${sys:bk.log.level}"
+ additivity: false
+ AppenderRef:
+ - ref: BkRollingFile
+
+ Logger:
+ name: org.apache.distributedlog
+ level: "${sys:bk.log.level}"
+ additivity: false
+ AppenderRef:
+ - ref: BkRollingFile
# Default root logger configuration
Root:
level: info
additivity: false
AppenderRef:
- ref: "${sys:pulsar.log.appender}"
+ - ref: "${sys:pulsar.log.appender}"
+ level: "${sys:pulsar.log.level}"
# Logger to inject filter script
# Logger:
@@ -90,4 +190,4 @@ Configuration:
# onMisMatch: DENY
# ScriptRef:
# ref: filter.js
-
\ No newline at end of file
+
diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml
index 3cd0726..b4f0017 100644
--- a/managed-ledger/pom.xml
+++ b/managed-ledger/pom.xml
@@ -36,21 +36,6 @@
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-server-shaded</artifactId>
- <version>${bookkeeper.version}</version>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>log4j</artifactId>
- <groupId>log4j</groupId>
- </exclusion>
- <exclusion>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
diff --git a/pom.xml b/pom.xml
index 8d6dfb2..b6ac3b1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -85,6 +85,7 @@ flexible messaging model and an intuitive client API.</description>
<module>pulsar-client-admin-shaded</module>
<module>pulsar-client-tools</module>
<module>pulsar-client-tools-shaded</module>
+ <module>pulsar-client-tools-test</module>
<module>pulsar-websocket</module>
<module>pulsar-proxy</module>
<module>pulsar-discovery-service</module>
@@ -130,6 +131,7 @@ flexible messaging model and an intuitive client API.</description>
<puppycrawl.checkstyle.version>6.19</puppycrawl.checkstyle.version>
<dockerfile-maven.version>1.3.7</dockerfile-maven.version>
<typetools.version>0.5.0</typetools.version>
+ <protobuf2.version>2.4.1</protobuf2.version>
<protobuf3.version>3.5.1</protobuf3.version>
<grpc.version>1.5.0</grpc.version>
<protoc-gen-grpc-java.version>1.0.0</protoc-gen-grpc-java.version>
@@ -232,6 +234,26 @@ flexible messaging model and an intuitive client API.</description>
<dependency>
<groupId>org.apache.bookkeeper</groupId>
+ <artifactId>bookkeeper-server-shaded</artifactId>
+ <version>${bookkeeper.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j</artifactId>
+ <groupId>log4j</groupId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.bookkeeper</groupId>
<artifactId>stream-storage-java-client</artifactId>
<version>${bookkeeper.version}</version>
</dependency>
@@ -682,22 +704,6 @@ flexible messaging model and an intuitive client API.</description>
</dependency>
<dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-api</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j-impl</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 8aa2f82..bd47970 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -425,6 +425,9 @@ public class ServiceConfiguration implements PulsarConfiguration {
// If true, export topic level metrics otherwise namespace level
private boolean exposeTopicLevelMetricsInPrometheus = true;
+ /**** --- Functions --- ****/
+ private boolean functionsWorkerEnabled = false;
+
public String getZookeeperServers() {
return zookeeperServers;
}
@@ -1470,4 +1473,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
public void setTlsCiphers(Set<String> tlsCiphers) {
this.tlsCiphers = tlsCiphers;
}
+
+ /**** --- Function ---- ****/
+
+ public void setFunctionsWorkerEnabled(boolean enabled) {
+ this.functionsWorkerEnabled = enabled;
+ }
+
+ public boolean isFunctionsWorkerEnabled() {
+ return functionsWorkerEnabled;
+ }
}
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 14dcca7..abbc831 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -119,6 +119,96 @@
<scope>test</scope>
</dependency>
+ <!-- functions related dependencies (begin) -->
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-functions-worker-shaded</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-all</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-lite</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf.nano</groupId>
+ <artifactId>protobuf-javanano</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf-lite</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf-nano</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions-utils</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions-metrics</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions-instance</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions-worker</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>net.jodah</groupId>
+ <artifactId>typetools</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.glassfish.jersey.media</groupId>
+ <artifactId>jersey-media-multipart</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.distributedlog</groupId>
+ <artifactId>distributedlog-core-shaded</artifactId>
+ </dependency>
+
+ <!-- functions related dependencies (end) -->
+
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
index 40f8d59..42d90e2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
@@ -34,6 +34,7 @@ import java.io.FileInputStream;
import java.net.MalformedURLException;
import java.nio.file.Paths;
import java.util.Arrays;
+import java.util.Optional;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.replication.AutoRecoveryMain;
@@ -42,6 +43,9 @@ import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.apache.pulsar.functions.worker.WorkerService;
import org.aspectj.weaver.loadtime.Agent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +76,12 @@ public class PulsarBrokerStarter {
@Parameter(names = {"-bc", "--bookie-conf"}, description = "Configuration file for Bookie")
private String bookieConfigFile = Paths.get("").toAbsolutePath().normalize().toString() + "/conf/bookkeeper.conf";
+ @Parameter(names = {"-rfw", "--run-functions-worker"}, description = "Run functions worker with Broker")
+ private boolean runFunctionsWorker = false;
+
+ @Parameter(names = {"-fwc", "--functions-worker-conf"}, description = "Configuration file for Functions Worker")
+ private String fnWorkerConfigFile = Paths.get("").toAbsolutePath().normalize().toString() + "/conf/functions_worker.yml";
+
@Parameter(names = {"-h", "--help"}, description = "Show this help message")
private boolean help = false;
}
@@ -103,6 +113,7 @@ public class PulsarBrokerStarter {
private final AutoRecoveryMain autoRecoveryMain;
private final StatsProvider bookieStatsProvider;
private final ServerConfiguration bookieConfig;
+ private final WorkerService functionsWorkerService;
BrokerStarter(String[] args) throws Exception{
StarterArguments starterArguments = new StarterArguments();
@@ -116,15 +127,40 @@ public class PulsarBrokerStarter {
System.exit(-1);
}
- // init broker config and pulsar service
+ // init broker config
if (isBlank(starterArguments.brokerConfigFile)) {
jcommander.usage();
throw new IllegalArgumentException("Need to specify a configuration file for broker");
} else {
brokerConfig = loadConfig(starterArguments.brokerConfigFile);
- pulsarService = new PulsarService(brokerConfig);
}
+ // init functions worker
+ if (starterArguments.runFunctionsWorker || brokerConfig.isFunctionsWorkerEnabled()) {
+ WorkerConfig workerConfig;
+ if (isBlank(starterArguments.fnWorkerConfigFile)) {
+ workerConfig = new WorkerConfig();
+ } else {
+ workerConfig = WorkerConfig.load(starterArguments.fnWorkerConfigFile);
+ }
+ // worker talks to local broker
+ workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + brokerConfig.getBrokerServicePort());
+ workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + brokerConfig.getWebServicePort());
+ String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
+ brokerConfig.getAdvertisedAddress());
+ workerConfig.setWorkerHostname(hostname);
+ workerConfig.setWorkerId(
+ "c-" + brokerConfig.getClusterName()
+ + "-fw-" + hostname
+ + "-" + workerConfig.getWorkerPort());
+ functionsWorkerService = new WorkerService(workerConfig);
+ } else {
+ functionsWorkerService = null;
+ }
+
+ // init pulsar service
+ pulsarService = new PulsarService(brokerConfig, Optional.ofNullable(functionsWorkerService));
+
// if no argument to run bookie in cmd line, read from pulsar config
if (!argsContains(args, "-rb") && !argsContains(args, "--run-bookie")) {
checkState(starterArguments.runBookie == false,
@@ -189,6 +225,16 @@ public class PulsarBrokerStarter {
pulsarService.start();
log.info("PulsarService started.");
+
+ // after broker is started, start the functions worker
+ if (null != functionsWorkerService) {
+ try {
+ functionsWorkerService.start();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw ie;
+ }
+ }
}
public void join() throws InterruptedException {
@@ -203,6 +249,11 @@ public class PulsarBrokerStarter {
}
public void shutdown() {
+ if (null != functionsWorkerService) {
+ functionsWorkerService.stop();
+ log.info("Shut down functions worker service successfully.");
+ }
+
pulsarService.getShutdownService().run();
log.info("Shut down broker service successfully.");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
index 314eacf..6c8dc14 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
@@ -23,6 +23,8 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
import java.io.FileInputStream;
import java.net.URL;
+import java.nio.file.Paths;
+import java.util.Optional;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -31,6 +33,8 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.aspectj.weaver.loadtime.Agent;
import org.slf4j.Logger;
@@ -48,6 +52,7 @@ public class PulsarStandaloneStarter {
PulsarAdmin admin;
LocalBookkeeperEnsemble bkEnsemble;
ServiceConfiguration config;
+ WorkerService fnWorkerService;
@Parameter(names = { "-c", "--config" }, description = "Configuration file path", required = true)
private String configFile;
@@ -76,6 +81,12 @@ public class PulsarStandaloneStarter {
@Parameter(names = { "--only-broker" }, description = "Only start Pulsar broker service (no ZK, BK)")
private boolean onlyBroker = false;
+ @Parameter(names = {"-nfw", "--no-functions-worker"}, description = "Run functions worker with Broker")
+ private boolean noFunctionsWorker = false;
+
+ @Parameter(names = {"-fwc", "--functions-worker-conf"}, description = "Configuration file for Functions Worker")
+ private String fnWorkerConfigFile = Paths.get("").toAbsolutePath().normalize().toString() + "/conf/functions_worker.yml";
+
@Parameter(names = { "-a", "--advertised-address" }, description = "Standalone broker advertised address")
private String advertisedAddress = null;
@@ -127,6 +138,10 @@ public class PulsarStandaloneStarter {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
+ if (fnWorkerService != null) {
+ fnWorkerService.stop();
+ }
+
if (broker != null) {
broker.close();
}
@@ -162,8 +177,29 @@ public class PulsarStandaloneStarter {
// load aspectj-weaver agent for instrumentation
AgentLoader.loadAgentClass(Agent.class.getName(), null);
+ // initialize the functions worker
+ if (!noFunctionsWorker) {
+ WorkerConfig workerConfig;
+ if (isBlank(fnWorkerConfigFile)) {
+ workerConfig = new WorkerConfig();
+ } else {
+ workerConfig = WorkerConfig.load(fnWorkerConfigFile);
+ }
+ // worker talks to local broker
+ workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort());
+ workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + config.getWebServicePort());
+ String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
+ config.getAdvertisedAddress());
+ workerConfig.setWorkerHostname(hostname);
+ workerConfig.setWorkerId(
+ "c-" + config.getClusterName()
+ + "-fw-" + hostname
+ + "-" + workerConfig.getWorkerPort());
+ fnWorkerService = new WorkerService(workerConfig);
+ }
+
// Start Broker
- broker = new PulsarService(config);
+ broker = new PulsarService(config, Optional.ofNullable(fnWorkerService));
broker.start();
// Create a sample namespace
@@ -203,6 +239,10 @@ public class PulsarStandaloneStarter {
log.info(e.getMessage());
}
+ if (null != fnWorkerService) {
+ fnWorkerService.start();
+ }
+
log.debug("--- setup completed ---");
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 9b23433..6bbc1e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.net.URL;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -62,6 +63,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
@@ -121,6 +123,7 @@ public class PulsarService implements AutoCloseable {
private final String brokerServiceUrl;
private final String brokerServiceUrlTls;
private final String brokerVersion;
+ private final Optional<WorkerService> functionWorkerService;
private final MessagingServiceShutdownHook shutdownService;
@@ -136,6 +139,10 @@ public class PulsarService implements AutoCloseable {
private final Condition isClosedCondition = mutex.newCondition();
public PulsarService(ServiceConfiguration config) {
+ this(config, Optional.empty());
+ }
+
+ public PulsarService(ServiceConfiguration config, Optional<WorkerService> functionWorkerService) {
// Validate correctness of configuration
PulsarConfigurationLoader.isComplete(config);
@@ -151,6 +158,7 @@ public class PulsarService implements AutoCloseable {
this.shutdownService = new MessagingServiceShutdownHook(this);
this.loadManagerExecutor = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-load-manager"));
+ this.functionWorkerService = functionWorkerService;
}
/**
@@ -543,6 +551,10 @@ public class PulsarService implements AutoCloseable {
return this.nsservice;
}
+ public WorkerService getWorkerService() {
+ return functionWorkerService.orElse(null);
+ }
+
/**
* Get a reference of the current <code>BrokerService</code> instance associated with the current
* <code>PulsarService</code> instance.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
new file mode 100644
index 0000000..7080474
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.function.Supplier;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.glassfish.jersey.media.multipart.FormDataParam;
+
+public class FunctionsBase extends AdminResource implements Supplier<WorkerService> {
+
+ private final FunctionsImpl functions;
+
+ public FunctionsBase() {
+ this.functions = new FunctionsImpl(this);
+ }
+
+ @Override
+ public WorkerService get() {
+ return pulsar().getWorkerService();
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/{functionName}")
+ @Consumes(MediaType.MULTIPART_FORM_DATA)
+ public Response registerFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName,
+ final @FormDataParam("data") InputStream uploadedInputStream,
+ final @FormDataParam("data") FormDataContentDisposition fileDetail,
+ final @FormDataParam("functionConfig") String functionConfigJson) {
+
+ return functions.registerFunction(
+ tenant, namespace, functionName, uploadedInputStream, fileDetail, functionConfigJson);
+
+ }
+
+ @PUT
+ @Path("/{tenant}/{namespace}/{functionName}")
+ @Consumes(MediaType.MULTIPART_FORM_DATA)
+ public Response updateFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName,
+ final @FormDataParam("data") InputStream uploadedInputStream,
+ final @FormDataParam("data") FormDataContentDisposition fileDetail,
+ final @FormDataParam("functionConfig") String functionConfigJson) {
+
+ return functions.updateFunction(
+ tenant, namespace, functionName, uploadedInputStream, fileDetail, functionConfigJson);
+
+ }
+
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/{functionName}")
+ public Response deregisterFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName) {
+ return functions.deregisterFunction(
+ tenant, namespace, functionName);
+ }
+
+ @GET
+ @Path("/{tenant}/{namespace}/{functionName}")
+ public Response getFunctionInfo(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName)
+ throws IOException {
+ return functions.getFunctionInfo(
+ tenant, namespace, functionName);
+ }
+
+ @GET
+ @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/status")
+ public Response getFunctionInstanceStatus(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName,
+ final @PathParam("instanceId") String instanceId) throws IOException {
+ return functions.getFunctionInstanceStatus(
+ tenant, namespace, functionName, instanceId);
+ }
+
+ @GET
+ @Path("/{tenant}/{namespace}/{functionName}/status")
+ public Response getFunctionStatus(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName) throws IOException {
+ return functions.getFunctionStatus(
+ tenant, namespace, functionName);
+ }
+
+ @GET
+ @Path("/{tenant}/{namespace}")
+ public Response listFunctions(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace) {
+ return functions.listFunctions(
+ tenant, namespace);
+
+ }
+
+ @GET
+ @Path("/cluster")
+ public Response getCluster() {
+ return functions.getCluster();
+ }
+
+ @GET
+ @Path("/assignments")
+ public Response getAssignments() {
+ return functions.getAssignments();
+ }
+
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Functions.java
similarity index 55%
copy from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Functions.java
index 7d8b054..1740732 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Functions.java
@@ -16,26 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.worker.rest;
+package org.apache.pulsar.broker.admin.v1;
-import javax.servlet.ServletContext;
-import javax.ws.rs.core.Context;
-import org.apache.pulsar.functions.worker.WorkerService;
-
-public class FunctionApiResource {
-
- public static final String ATTRIBUTE_FUNCTION_WORKER = "function-worker";
-
- private WorkerService workerService;
-
- @Context
- protected ServletContext servletContext;
-
- public synchronized WorkerService worker() {
- if (this.workerService == null) {
- this.workerService = (WorkerService) servletContext.getAttribute(ATTRIBUTE_FUNCTION_WORKER);
- }
- return this.workerService;
- }
+import io.swagger.annotations.Api;
+import javax.ws.rs.Path;
+import org.apache.pulsar.broker.admin.impl.FunctionsBase;
+@Path("/functions")
+@Api(value = "/functions", description = "Functions admin apis", tags = "functions")
+public class Functions extends FunctionsBase {
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
similarity index 55%
copy from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
index 7d8b054..cc68677 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
@@ -16,26 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.worker.rest;
+package org.apache.pulsar.broker.admin.v2;
-import javax.servlet.ServletContext;
-import javax.ws.rs.core.Context;
-import org.apache.pulsar.functions.worker.WorkerService;
-
-public class FunctionApiResource {
-
- public static final String ATTRIBUTE_FUNCTION_WORKER = "function-worker";
-
- private WorkerService workerService;
-
- @Context
- protected ServletContext servletContext;
-
- public synchronized WorkerService worker() {
- if (this.workerService == null) {
- this.workerService = (WorkerService) servletContext.getAttribute(ATTRIBUTE_FUNCTION_WORKER);
- }
- return this.workerService;
- }
+import com.wordnik.swagger.annotations.Api;
+import javax.ws.rs.Path;
+import org.apache.pulsar.broker.admin.impl.FunctionsBase;
+@Path("/functions")
+@Api(value = "/functions", description = "Functions admin apis", tags = "functions")
+public class Functions extends FunctionsBase {
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 20ff44b..02c8b1a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -49,6 +49,7 @@ import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.resource.Resource;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.ExecutorThreadPool;
+import org.glassfish.jersey.media.multipart.MultiPartFeature;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
import org.slf4j.Logger;
@@ -120,6 +121,7 @@ public class WebService implements AutoCloseable {
ResourceConfig config = new ResourceConfig();
config.packages("jersey.config.server.provider.packages", javaPackages);
config.register(provider);
+ config.register(MultiPartFeature.class);
ServletHolder servletHolder = new ServletHolder(new ServletContainer(config));
servletHolder.setAsyncSupported(true);
addServlet(basePath, servletHolder, requiresAuthentication, attributeMap);
diff --git a/pulsar-client-cpp/run-unit-tests.sh b/pulsar-client-cpp/run-unit-tests.sh
index f91e6ff..9f8eb5f 100755
--- a/pulsar-client-cpp/run-unit-tests.sh
+++ b/pulsar-client-cpp/run-unit-tests.sh
@@ -25,10 +25,11 @@ rm -rf ./pulsar-dist
mkdir pulsar-dist
tar xfz ../all/target/apache-pulsar*bin.tar.gz -C pulsar-dist --strip-components 1
-PULSAR_STANDALONE_CONF=$PWD/test-conf/standalone.conf pulsar-dist/bin/pulsar standalone > broker.log &
+PULSAR_STANDALONE_CONF=$PWD/test-conf/standalone.conf pulsar-dist/bin/pulsar standalone --no-functions-worker > broker.log &
standalone_pid=$!;
PULSAR_STANDALONE_CONF=$PWD/test-conf/standalone-ssl.conf pulsar-dist/bin/pulsar standalone \
+ --no-functions-worker \
--zookeeper-port 2191 --bookkeeper-port 3191 \
--zookeeper-dir data2/standalone/zookeeper --bookkeeper-dir \
data2/standalone/bookkeeper > broker-tls.log &
diff --git a/pulsar-zookeeper-utils/pom.xml b/pulsar-client-tools-test/pom.xml
similarity index 54%
copy from pulsar-zookeeper-utils/pom.xml
copy to pulsar-client-tools-test/pom.xml
index 0a621f0..0aefc77 100644
--- a/pulsar-zookeeper-utils/pom.xml
+++ b/pulsar-client-tools-test/pom.xml
@@ -21,7 +21,6 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
-
<parent>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar</artifactId>
@@ -29,90 +28,57 @@
<relativePath>..</relativePath>
</parent>
- <artifactId>pulsar-zookeeper-utils</artifactId>
- <name>Pulsar ZooKeeper Utils</name>
+ <artifactId>pulsar-client-tools-test</artifactId>
+ <name>Pulsar Client Tools Test</name>
+ <description>Pulsar Client Tools Test</description>
<dependencies>
-
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.bookkeeper</groupId>
- <artifactId>bookkeeper-server-shaded</artifactId>
- <version>${bookkeeper.version}</version>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>log4j</artifactId>
- <groupId>log4j</groupId>
- </exclusion>
- <exclusion>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
<dependency>
- <groupId>org.apache.bookkeeper.stats</groupId>
- <artifactId>prometheus-metrics-provider</artifactId>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-tools</artifactId>
+ <version>${project.version}</version>
</dependency>
-
<dependency>
- <groupId>com.github.ben-manes.caffeine</groupId>
- <artifactId>caffeine</artifactId>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>managed-ledger</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
</dependency>
-
<dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <classifier>tests</classifier>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-broker</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
<scope>test</scope>
</dependency>
-
<dependency>
<groupId>${project.groupId}</groupId>
- <artifactId>pulsar-common</artifactId>
- <version>${project.parent.version}</version>
+ <artifactId>pulsar-broker</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
</dependency>
-
<dependency>
<groupId>${project.groupId}</groupId>
- <artifactId>managed-ledger</artifactId>
- <version>${project.parent.version}</version>
+ <artifactId>pulsar-zookeeper-utils</artifactId>
+ <version>${project.version}</version>
<scope>test</scope>
</dependency>
-
<dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>managed-ledger</artifactId>
- <version>${project.parent.version}</version>
- <type>test-jar</type>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
-
</dependencies>
-
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
</plugin>
</plugins>
</build>
-
</project>
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
similarity index 100%
rename from pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
rename to pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/utils/IOUtilsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/utils/IOUtilsTest.java
similarity index 100%
rename from pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/utils/IOUtilsTest.java
rename to pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/utils/IOUtilsTest.java
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/utils/NameValueParameterSplitterTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/utils/NameValueParameterSplitterTest.java
similarity index 100%
rename from pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/utils/NameValueParameterSplitterTest.java
rename to pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/utils/NameValueParameterSplitterTest.java
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
similarity index 100%
rename from pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
rename to pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
diff --git a/pulsar-client-tools/pom.xml b/pulsar-client-tools/pom.xml
index 1d2454d..6ec1e80 100644
--- a/pulsar-client-tools/pom.xml
+++ b/pulsar-client-tools/pom.xml
@@ -19,89 +19,58 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar</artifactId>
- <version>2.0.0-incubating-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar</artifactId>
+ <version>2.0.0-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
- <artifactId>pulsar-client-tools</artifactId>
- <name>Pulsar Client Tools</name>
- <description>Pulsar Client Tools</description>
+ <artifactId>pulsar-client-tools</artifactId>
+ <name>Pulsar Client Tools</name>
+ <description>Pulsar Client Tools</description>
- <dependencies>
- <dependency>
- <groupId>com.beust</groupId>
- <artifactId>jcommander</artifactId>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-client-admin-original</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-client-original</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.asynchttpclient</groupId>
- <artifactId>async-http-client</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>managed-ledger</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-broker</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-broker</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-zookeeper-utils</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
+ <dependencies>
+ <dependency>
+ <groupId>com.beust</groupId>
+ <artifactId>jcommander</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-admin-original</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-original</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.asynchttpclient</groupId>
+ <artifactId>async-http-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ </dependencies>
</project>
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index e227d6d..ee246f3 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -48,7 +48,7 @@
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
- <version>2.4.1</version>
+ <version>${protobuf2.version}</version>
</dependency>
<dependency>
diff --git a/pulsar-functions/bin/pulsar b/pulsar-functions/bin/pulsar
index 4c35fdc..ba219a1 100755
--- a/pulsar-functions/bin/pulsar
+++ b/pulsar-functions/bin/pulsar
@@ -52,7 +52,7 @@ fi
# exclude tests jar
if [ -z "$PULSAR_JAR" ]; then
- BUILT_JAR=`ls $PULSAR_HOME/worker/target/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
+ BUILT_JAR=`ls $PULSAR_HOME/worker-runner/target/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
if [ -z "${BUILT_JAR}" ]; then
echo "\nCouldn't find pulsar jar.";
echo "Make sure you've run 'mvn package'\n";
@@ -118,10 +118,10 @@ add_maven_deps_to_classpath() {
# Need to generate classpath from maven pom. This is costly so generate it
# and cache it. Save the file into our target dir so a mvn clean will get
# clean it up and force us create a new one.
- f="${PULSAR_HOME}/worker/target/classpath_shaded.txt"
+ f="${PULSAR_HOME}/worker-runner/target/classpath_shaded.txt"
if [ ! -f "${f}" ]
then
- ${MVN} -f "${PULSAR_HOME}/worker/pom.xml" dependency:build-classpath -DincludeScope=compile -Dmdep.outputFile="${f}" &> /dev/null
+ ${MVN} -f "${PULSAR_HOME}/worker-runner/pom.xml" dependency:build-classpath -DincludeScope=compile -Dmdep.outputFile="${f}" &> /dev/null
fi
PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"`
}
diff --git a/pulsar-functions/cli/pom.xml b/pulsar-functions/cli/pom.xml
index b22a502..2345e25 100644
--- a/pulsar-functions/cli/pom.xml
+++ b/pulsar-functions/cli/pom.xml
@@ -33,20 +33,6 @@
<dependencies>
- <!-- logging -->
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j-impl</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- </dependency>
-
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-functions-runtime</artifactId>
diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml
index 39dd271..aeb53ae 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -57,7 +57,6 @@
<version>${project.version}</version>
</dependency>
- <!-- update this when bookkeeper produces a snapshot version -->
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>stream-storage-java-client</artifactId>
diff --git a/pulsar-functions/pom.xml b/pulsar-functions/pom.xml
index 47450a8..2a6daac 100644
--- a/pulsar-functions/pom.xml
+++ b/pulsar-functions/pom.xml
@@ -39,6 +39,8 @@
<module>instance</module>
<module>runtime</module>
<module>worker</module>
+ <module>worker-shaded</module>
+ <module>worker-runner</module>
<module>cli</module>
<module>java-examples</module>
<module>dist</module>
diff --git a/pulsar-functions/runtime/pom.xml b/pulsar-functions/runtime/pom.xml
index dc236b6..88828dc 100644
--- a/pulsar-functions/runtime/pom.xml
+++ b/pulsar-functions/runtime/pom.xml
@@ -45,6 +45,11 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-all</artifactId>
+ </dependency>
+
</dependencies>
<build>
diff --git a/pulsar-functions/utils/pom.xml b/pulsar-functions/utils/pom.xml
index b71931d..b5cb83e 100644
--- a/pulsar-functions/utils/pom.xml
+++ b/pulsar-functions/utils/pom.xml
@@ -33,20 +33,6 @@
<dependencies>
- <!-- logging -->
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j-impl</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- </dependency>
-
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-tools-shaded</artifactId>
diff --git a/pulsar-functions/worker-runner/pom.xml b/pulsar-functions/worker-runner/pom.xml
new file mode 100644
index 0000000..1e621d5
--- /dev/null
+++ b/pulsar-functions/worker-runner/pom.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions</artifactId>
+ <version>2.0.0-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>pulsar-functions-worker-runner</artifactId>
+ <name>Pulsar Functions :: Worker Runner</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-functions-worker-shaded</artifactId>
+ <version>${project.parent.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-lite</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf.nano</groupId>
+ <artifactId>protobuf-javanano</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf-lite</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf-nano</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions-proto</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions-utils</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions-metrics</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions-instance</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions-worker</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-original</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-admin-original</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ </dependencies>
+</project>
diff --git a/pulsar-functions/worker-shaded/pom.xml b/pulsar-functions/worker-shaded/pom.xml
new file mode 100644
index 0000000..670c8b7
--- /dev/null
+++ b/pulsar-functions/worker-shaded/pom.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions</artifactId>
+ <version>2.0.0-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>pulsar-functions-worker-shaded</artifactId>
+ <name>Pulsar Functions :: Worker Shaded</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-functions-worker</artifactId>
+ <version>${project.parent.version}</version>
+ <exclusions>
+ <!-- exclude the dependencies already exists in bookkeeper-server-shaded -->
+ <exclusion>
+ <groupId>org.apache.bookkeeper</groupId>
+ <artifactId>bookkeeper-common</artifactId>
+ </exclusion>
+ <!-- exclude `pulsar-client-tools-shaded` here, this allows worker-runner and broker to use unshaded clients -->
+ <exclusion>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-tools-shaded</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- bookkeeper-server-shaded include circe-checksum, bookkeeper-common and bookkeeper-stats-api -->
+ <dependency>
+ <groupId>org.apache.bookkeeper</groupId>
+ <artifactId>bookkeeper-server-shaded</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+ <minimizeJar>false</minimizeJar>
+
+ <artifactSet>
+ <!-- package the dependencies that use protobuf & grpc and relocate protobuf -->
+ <includes>
+ <include>com.google.protobuf:protobuf-lite</include>
+ <include>com.google.protobuf:protobuf-java</include>
+ <include>com.google.protobuf.nano:protobuf-javanano</include>
+ <include>com.google.protobuf:protobuf-java-util</include>
+ <include>com.google.instrumentation:instrumentation-api</include>
+ <include>com.google.api.grpc:proto-google-common-protos</include>
+ <!-- dependencies use protobuf -->
+ <include>org.apache.pulsar:pulsar-functions-proto</include>
+ <include>org.apache.pulsar:pulsar-functions-utils</include>
+ <include>org.apache.pulsar:pulsar-functions-metrics</include>
+ <include>org.apache.pulsar:pulsar-functions-instance</include>
+ <include>org.apache.pulsar:pulsar-functions-runtime</include>
+ <include>org.apache.pulsar:pulsar-functions-worker</include>
+ <!-- protobuf dependencies in grpc -->
+ <include>io.grpc:*</include>
+ <!-- bookkeeper key/value service -->
+ <include>org.apache.bookkeeper:stream-storage-java-client</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <!-- bookkeeper shading rule -->
+ <relocation>
+ <pattern>com.google.protobuf</pattern>
+ <shadedPattern>org.apache.pulsar.functions.shaded.com.google.protobuf</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index 24f690f..6f408e7 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -34,20 +34,6 @@
<dependencies>
- <!-- logging -->
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j-impl</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- </dependency>
-
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-functions-runtime</artifactId>
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index 754c8af..4942b34 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -160,7 +160,7 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener {
public static WorkerInfo parseFrom(String str) {
String[] tokens = str.split(":");
if (tokens.length != 3) {
- throw new IllegalArgumentException("Invalid string to parse WorkerInfo");
+ throw new IllegalArgumentException("Invalid string to parse WorkerInfo : " + str);
}
String workerId = tokens[0];
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index 55450e6..09f9fc0 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -20,10 +20,7 @@ package org.apache.pulsar.functions.worker;
import com.google.common.util.concurrent.AbstractService;
-import java.io.IOException;
-import java.net.URI;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.functions.worker.rest.WorkerServer;
@Slf4j
@@ -51,13 +48,7 @@ public class Worker extends AbstractService {
}
protected void doStartImpl() throws InterruptedException {
- URI dlogUri;
- try {
- dlogUri = FunctionMetadataSetup.setupFunctionMetadata(workerConfig);
- } catch (PulsarAdminException | IOException e) {
- throw new RuntimeException(e);
- }
- workerService.start(dlogUri);
+ workerService.start();
WorkerServer server = new WorkerServer(workerService);
this.serverThread = new Thread(server, server.getThreadName());
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 5793e97..732ab07 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -20,12 +20,14 @@ package org.apache.pulsar.functions.worker;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
import java.net.URI;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
@@ -71,7 +73,15 @@ public class WorkerService {
return contextHandler;
}
- public void start(URI dlogUri) throws InterruptedException {
+ public void start() throws InterruptedException {
+ try {
+ start(FunctionMetadataSetup.setupFunctionMetadata(workerConfig));
+ } catch (PulsarAdminException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void start(URI dlogUri) throws InterruptedException {
log.info("Starting worker {}...", workerConfig.getWorkerId());
try {
log.info("Worker Configs: {}", new ObjectMapper().writerWithDefaultPrettyPrinter()
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
index 7d8b054..1c5c739 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
@@ -18,24 +18,30 @@
*/
package org.apache.pulsar.functions.worker.rest;
+import java.util.function.Supplier;
import javax.servlet.ServletContext;
import javax.ws.rs.core.Context;
import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-public class FunctionApiResource {
+public class FunctionApiResource implements Supplier<WorkerService> {
public static final String ATTRIBUTE_FUNCTION_WORKER = "function-worker";
+ protected final FunctionsImpl functions;
private WorkerService workerService;
-
@Context
protected ServletContext servletContext;
- public synchronized WorkerService worker() {
+ public FunctionApiResource() {
+ this.functions = new FunctionsImpl(this);
+ }
+
+ @Override
+ public synchronized WorkerService get() {
if (this.workerService == null) {
this.workerService = (WorkerService) servletContext.getAttribute(ATTRIBUTE_FUNCTION_WORKER);
}
return this.workerService;
}
-
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
similarity index 91%
copy from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
copy to pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 7227196..4eda822 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -16,13 +16,33 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.worker.rest.api.v2;
+package org.apache.pulsar.functions.worker.rest.api;
-import com.google.gson.Gson;
-import javax.ws.rs.core.Response.Status;
+import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.gson.Gson;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.policies.data.ErrorData;
@@ -36,33 +56,28 @@ import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.MembershipManager;
import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.request.RequestResult;
-import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-
@Slf4j
-@Path("/functions")
-public class FunctionApiV2Resource extends FunctionApiResource {
+public class FunctionsImpl {
+
+ private final Supplier<WorkerService> workerServiceSupplier;
+
+ public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
+ this.workerServiceSupplier = workerServiceSupplier;
+ }
+
+ private WorkerService worker() {
+ try {
+ return checkNotNull(workerServiceSupplier.get());
+ } catch (Throwable t) {
+ log.info("Failed to get worker service", t);
+ throw t;
+ }
+ }
@POST
@Path("/{tenant}/{namespace}/{functionName}")
@@ -73,7 +88,6 @@ public class FunctionApiV2Resource extends FunctionApiResource {
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("functionConfig") String functionConfigJson) {
-
FunctionConfig functionConfig;
// validate parameters
try {
@@ -82,7 +96,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
} catch (IllegalArgumentException e) {
log.error("Invalid register function request @ /{}/{}/{}",
tenant, namespace, functionName, e);
- return Response.status(Response.Status.BAD_REQUEST)
+ return Response.status(Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage())).build();
}
@@ -91,7 +105,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
if (functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
log.error("Function {}/{}/{} already exists", tenant, namespace, functionName);
- return Response.status(Response.Status.BAD_REQUEST)
+ return Response.status(Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(String.format("Function %s already exist", functionName))).build();
}
@@ -132,7 +146,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
} catch (IllegalArgumentException e) {
log.error("Invalid update function request @ /{}/{}/{}",
tenant, namespace, functionName, e);
- return Response.status(Response.Status.BAD_REQUEST)
+ return Response.status(Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage())).build();
}
@@ -140,7 +154,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
- return Response.status(Response.Status.BAD_REQUEST)
+ return Response.status(Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
}
@@ -176,7 +190,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
} catch (IllegalArgumentException e) {
log.error("Invalid deregister function request @ /{}/{}/{}",
tenant, namespace, functionName, e);
- return Response.status(Response.Status.BAD_REQUEST)
+ return Response.status(Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage())).build();
}
@@ -197,7 +211,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
try {
requestResult = completableFuture.get();
if (!requestResult.isSuccess()) {
- return Response.status(Response.Status.BAD_REQUEST)
+ return Response.status(Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(requestResult.getMessage()))
.build();
@@ -217,7 +231,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
.build();
}
- return Response.status(Response.Status.OK).entity(requestResult.toJson()).build();
+ return Response.status(Status.OK).entity(requestResult.toJson()).build();
}
@GET
@@ -233,7 +247,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
} catch (IllegalArgumentException e) {
log.error("Invalid getFunction request @ /{}/{}/{}",
tenant, namespace, functionName, e);
- return Response.status(Response.Status.BAD_REQUEST)
+ return Response.status(Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage())).build();
}
@@ -249,7 +263,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
String functionConfigJson = JsonFormat.printer().print(functionMetaData.getFunctionConfig());
- return Response.status(Response.Status.OK).entity(functionConfigJson).build();
+ return Response.status(Status.OK).entity(functionConfigJson).build();
}
@GET
@@ -265,7 +279,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
} catch (IllegalArgumentException e) {
log.error("Invalid getFunctionStatus request @ /{}/{}/{}",
tenant, namespace, functionName, e);
- return Response.status(Response.Status.BAD_REQUEST)
+ return Response.status(Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage())).build();
}
@@ -280,7 +294,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
}
FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
- InstanceCommunication.FunctionStatus functionStatus = null;
+ FunctionStatus functionStatus = null;
try {
functionStatus = functionRuntimeManager.getFunctionInstanceStatus(
tenant, namespace, functionName, Integer.parseInt(instanceId));
@@ -289,11 +303,11 @@ public class FunctionApiV2Resource extends FunctionApiResource {
FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
functionStatusBuilder.setRunning(false);
String functionConfigJson = JsonFormat.printer().print(functionStatusBuilder.build());
- return Response.status(Response.Status.OK).entity(functionConfigJson).build();
+ return Response.status(Status.OK).entity(functionConfigJson).build();
}
String jsonResponse = JsonFormat.printer().print(functionStatus);
- return Response.status(Response.Status.OK).entity(jsonResponse).build();
+ return Response.status(Status.OK).entity(jsonResponse).build();
}
@GET
@@ -308,7 +322,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
} catch (IllegalArgumentException e) {
log.error("Invalid getFunctionStatus request @ /{}/{}/{}",
tenant, namespace, functionName, e);
- return Response.status(Response.Status.BAD_REQUEST)
+ return Response.status(Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage())).build();
}
@@ -331,11 +345,11 @@ public class FunctionApiV2Resource extends FunctionApiResource {
FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
functionStatusBuilder.setRunning(false);
String functionConfigJson = JsonFormat.printer().print(functionStatusBuilder.build());
- return Response.status(Response.Status.OK).entity(functionConfigJson).build();
+ return Response.status(Status.OK).entity(functionConfigJson).build();
}
String jsonResponse = JsonFormat.printer().print(functionStatusList);
- return Response.status(Response.Status.OK).entity(jsonResponse).build();
+ return Response.status(Status.OK).entity(jsonResponse).build();
}
@GET
@@ -349,7 +363,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
} catch (IllegalArgumentException e) {
log.error("Invalid listFunctions request @ /{}/{}",
tenant, namespace, e);
- return Response.status(Response.Status.BAD_REQUEST)
+ return Response.status(Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage())).build();
}
@@ -358,7 +372,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
Collection<String> functionStateList = functionMetaDataManager.listFunctions(tenant, namespace);
- return Response.status(Response.Status.OK).entity(new Gson().toJson(functionStateList.toArray())).build();
+ return Response.status(Status.OK).entity(new Gson().toJson(functionStateList.toArray())).build();
}
private Response updateRequest(FunctionMetaData functionMetaData,
@@ -389,7 +403,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
try {
requestResult = completableFuture.get();
if (!requestResult.isSuccess()) {
- return Response.status(Response.Status.BAD_REQUEST)
+ return Response.status(Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(requestResult.getMessage()))
.build();
@@ -406,7 +420,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
.build();
}
- return Response.status(Response.Status.OK).build();
+ return Response.status(Status.OK).build();
}
@GET
@@ -414,7 +428,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
public Response getCluster() {
MembershipManager membershipManager = worker().getMembershipManager();
List<MembershipManager.WorkerInfo> members = membershipManager.getCurrentMembership();
- return Response.status(Response.Status.OK).entity(new Gson().toJson(members)).build();
+ return Response.status(Status.OK).entity(new Gson().toJson(members)).build();
}
@GET
@@ -426,7 +440,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
for (Map.Entry<String, Map<String, Function.Assignment>> entry : assignments.entrySet()) {
ret.put(entry.getKey(), entry.getValue().keySet());
}
- return Response.status(Response.Status.OK).entity(
+ return Response.status(Status.OK).entity(
new Gson().toJson(ret)).build();
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 7227196..88cd1a4 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -18,25 +18,8 @@
*/
package org.apache.pulsar.functions.worker.rest.api.v2;
-import com.google.gson.Gson;
-import javax.ws.rs.core.Response.Status;
-
import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.util.JsonFormat;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.common.policies.data.ErrorData;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.Function.FunctionConfig;
-import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
-import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
-import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
-import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
-import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.MembershipManager;
-import org.apache.pulsar.functions.worker.Utils;
-import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;
@@ -52,13 +35,6 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
@Slf4j
@Path("/functions")
@@ -74,44 +50,9 @@ public class FunctionApiV2Resource extends FunctionApiResource {
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("functionConfig") String functionConfigJson) {
- FunctionConfig functionConfig;
- // validate parameters
- try {
- functionConfig = validateUpdateRequestParams(tenant, namespace, functionName,
- uploadedInputStream, fileDetail, functionConfigJson);
- } catch (IllegalArgumentException e) {
- log.error("Invalid register function request @ /{}/{}/{}",
- tenant, namespace, functionName, e);
- return Response.status(Response.Status.BAD_REQUEST)
- .type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(e.getMessage())).build();
- }
-
- FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-
- if (functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
- log.error("Function {}/{}/{} already exists", tenant, namespace, functionName);
- return Response.status(Response.Status.BAD_REQUEST)
- .type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(String.format("Function %s already exist", functionName))).build();
- }
-
- // function state
- FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder()
- .setFunctionConfig(functionConfig)
- .setCreateTime(System.currentTimeMillis())
- .setVersion(0);
+ return functions.registerFunction(
+ tenant, namespace, functionName, uploadedInputStream, fileDetail, functionConfigJson);
- PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder()
- .setPackagePath(String.format(
- "%s/%s/%s/%s",
- tenant,
- namespace,
- functionName,
- Utils.getUniquePackageName(fileDetail.getFileName())));
- functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
-
- return updateRequest(functionMetaDataBuilder.build(), uploadedInputStream);
}
@PUT
@@ -124,43 +65,9 @@ public class FunctionApiV2Resource extends FunctionApiResource {
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("functionConfig") String functionConfigJson) {
- FunctionConfig functionConfig;
- // validate parameters
- try {
- functionConfig = validateUpdateRequestParams(tenant, namespace, functionName,
- uploadedInputStream, fileDetail, functionConfigJson);
- } catch (IllegalArgumentException e) {
- log.error("Invalid update function request @ /{}/{}/{}",
- tenant, namespace, functionName, e);
- return Response.status(Response.Status.BAD_REQUEST)
- .type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(e.getMessage())).build();
- }
-
- FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-
- if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
- return Response.status(Response.Status.BAD_REQUEST)
- .type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
- }
+ return functions.updateFunction(
+ tenant, namespace, functionName, uploadedInputStream, fileDetail, functionConfigJson);
- // function state
- FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder()
- .setFunctionConfig(functionConfig)
- .setCreateTime(System.currentTimeMillis())
- .setVersion(0);
-
- PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder()
- .setPackagePath(String.format(
- "%s/%s/%s/%s",
- tenant,
- namespace,
- functionName,
- Utils.getUniquePackageName(fileDetail.getFileName())));
- functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
-
- return updateRequest(functionMetaDataBuilder.build(), uploadedInputStream);
}
@@ -169,55 +76,8 @@ public class FunctionApiV2Resource extends FunctionApiResource {
public Response deregisterFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) {
-
- // validate parameters
- try {
- validateDeregisterRequestParams(tenant, namespace, functionName);
- } catch (IllegalArgumentException e) {
- log.error("Invalid deregister function request @ /{}/{}/{}",
- tenant, namespace, functionName, e);
- return Response.status(Response.Status.BAD_REQUEST)
- .type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(e.getMessage())).build();
- }
-
- FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
- if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
- log.error("Function to deregister does not exist @ /{}/{}/{}",
- tenant, namespace, functionName);
- return Response.status(Status.NOT_FOUND)
- .type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
- }
-
- CompletableFuture<RequestResult> completableFuture
- = functionMetaDataManager.deregisterFunction(tenant, namespace, functionName);
-
- RequestResult requestResult = null;
- try {
- requestResult = completableFuture.get();
- if (!requestResult.isSuccess()) {
- return Response.status(Response.Status.BAD_REQUEST)
- .type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(requestResult.getMessage()))
- .build();
- }
- } catch (ExecutionException e) {
- log.error("Execution Exception while deregistering function @ /{}/{}/{}",
- tenant, namespace, functionName, e);
- return Response.serverError()
- .type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(e.getCause().getMessage()))
- .build();
- } catch (InterruptedException e) {
- log.error("Interrupted Exception while deregistering function @ /{}/{}/{}",
- tenant, namespace, functionName, e);
- return Response.status(Status.REQUEST_TIMEOUT)
- .type(MediaType.APPLICATION_JSON)
- .build();
- }
-
- return Response.status(Response.Status.OK).entity(requestResult.toJson()).build();
+ return functions.deregisterFunction(
+ tenant, namespace, functionName);
}
@GET
@@ -226,30 +86,8 @@ public class FunctionApiV2Resource extends FunctionApiResource {
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName)
throws InvalidProtocolBufferException {
-
- // validate parameters
- try {
- validateGetFunctionRequestParams(tenant, namespace, functionName);
- } catch (IllegalArgumentException e) {
- log.error("Invalid getFunction request @ /{}/{}/{}",
- tenant, namespace, functionName, e);
- return Response.status(Response.Status.BAD_REQUEST)
- .type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(e.getMessage())).build();
- }
-
- FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
- if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
- log.error("Function in getFunction does not exist @ /{}/{}/{}",
- tenant, namespace, functionName);
- return Response.status(Status.NOT_FOUND)
- .type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
- }
-
- FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
- String functionConfigJson = JsonFormat.printer().print(functionMetaData.getFunctionConfig());
- return Response.status(Response.Status.OK).entity(functionConfigJson).build();
+ return functions.getFunctionInfo(
+ tenant, namespace, functionName);
}
@GET
@@ -258,42 +96,8 @@ public class FunctionApiV2Resource extends FunctionApiResource {
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) throws IOException {
-
- // validate parameters
- try {
- validateGetFunctionInstanceRequestParams(tenant, namespace, functionName, instanceId);
- } catch (IllegalArgumentException e) {
- log.error("Invalid getFunctionStatus request @ /{}/{}/{}",
- tenant, namespace, functionName, e);
- return Response.status(Response.Status.BAD_REQUEST)
- .type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(e.getMessage())).build();
- }
-
- FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
- if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
- log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}",
- tenant, namespace, functionName);
- return Response.status(Status.NOT_FOUND)
- .type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
- }
-
- FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
- InstanceCommunication.FunctionStatus functionStatus = null;
- try {
- functionStatus = functionRuntimeManager.getFunctionInstanceStatus(
- tenant, namespace, functionName, Integer.parseInt(instanceId));
- } catch (Exception e) {
- log.error("Got Exception Getting Status", e);
- FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
- functionStatusBuilder.setRunning(false);
- String functionConfigJson = JsonFormat.printer().print(functionStatusBuilder.build());
- return Response.status(Response.Status.OK).entity(functionConfigJson).build();
- }
-
- String jsonResponse = JsonFormat.printer().print(functionStatus);
- return Response.status(Response.Status.OK).entity(jsonResponse).build();
+ return functions.getFunctionInstanceStatus(
+ tenant, namespace, functionName, instanceId);
}
@GET
@@ -301,241 +105,29 @@ public class FunctionApiV2Resource extends FunctionApiResource {
public Response getFunctionStatus(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) throws IOException {
-
- // validate parameters
- try {
- validateGetFunctionRequestParams(tenant, namespace, functionName);
- } catch (IllegalArgumentException e) {
- log.error("Invalid getFunctionStatus request @ /{}/{}/{}",
- tenant, namespace, functionName, e);
- return Response.status(Response.Status.BAD_REQUEST)
- .type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(e.getMessage())).build();
- }
-
- FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
- if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
- log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}",
- tenant, namespace, functionName);
- return Response.status(Status.NOT_FOUND)
- .type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
- }
-
- FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
- InstanceCommunication.FunctionStatusList functionStatusList = null;
- try {
- functionStatusList = functionRuntimeManager.getAllFunctionStatus(tenant, namespace, functionName);
- } catch (Exception e) {
- log.error("Got Exception Getting Status", e);
- FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
- functionStatusBuilder.setRunning(false);
- String functionConfigJson = JsonFormat.printer().print(functionStatusBuilder.build());
- return Response.status(Response.Status.OK).entity(functionConfigJson).build();
- }
-
- String jsonResponse = JsonFormat.printer().print(functionStatusList);
- return Response.status(Response.Status.OK).entity(jsonResponse).build();
+ return functions.getFunctionStatus(
+ tenant, namespace, functionName);
}
@GET
@Path("/{tenant}/{namespace}")
public Response listFunctions(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace) {
+ return functions.listFunctions(
+ tenant, namespace);
- // validate parameters
- try {
- validateListFunctionRequestParams(tenant, namespace);
- } catch (IllegalArgumentException e) {
- log.error("Invalid listFunctions request @ /{}/{}",
- tenant, namespace, e);
- return Response.status(Response.Status.BAD_REQUEST)
- .type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(e.getMessage())).build();
- }
-
- FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-
- Collection<String> functionStateList = functionMetaDataManager.listFunctions(tenant, namespace);
-
- return Response.status(Response.Status.OK).entity(new Gson().toJson(functionStateList.toArray())).build();
- }
-
- private Response updateRequest(FunctionMetaData functionMetaData,
- InputStream uploadedInputStream) {
- // Upload to bookkeeper
- try {
- log.info("Uploading function package to {}", functionMetaData.getPackageLocation());
-
- Utils.uploadToBookeeper(
- worker().getDlogNamespace(),
- uploadedInputStream,
- functionMetaData.getPackageLocation().getPackagePath());
- } catch (IOException e) {
- log.error("Error uploading file {}", functionMetaData.getPackageLocation(), e);
- return Response.serverError()
- .type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(e.getMessage()))
- .build();
- }
-
- // Submit to FMT
- FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-
- CompletableFuture<RequestResult> completableFuture
- = functionMetaDataManager.updateFunction(functionMetaData);
-
- RequestResult requestResult = null;
- try {
- requestResult = completableFuture.get();
- if (!requestResult.isSuccess()) {
- return Response.status(Response.Status.BAD_REQUEST)
- .type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(requestResult.getMessage()))
- .build();
- }
- } catch (ExecutionException e) {
- return Response.serverError()
- .type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(e.getCause().getMessage()))
- .build();
- } catch (InterruptedException e) {
- return Response.status(Status.REQUEST_TIMEOUT)
- .type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(e.getCause().getMessage()))
- .build();
- }
-
- return Response.status(Response.Status.OK).build();
}
@GET
@Path("/cluster")
public Response getCluster() {
- MembershipManager membershipManager = worker().getMembershipManager();
- List<MembershipManager.WorkerInfo> members = membershipManager.getCurrentMembership();
- return Response.status(Response.Status.OK).entity(new Gson().toJson(members)).build();
+ return functions.getCluster();
}
@GET
@Path("/assignments")
public Response getAssignments() {
- FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
- Map<String, Map<String, Function.Assignment>> assignments = functionRuntimeManager.getCurrentAssignments();
- Map<String, Collection<String>> ret = new HashMap<>();
- for (Map.Entry<String, Map<String, Function.Assignment>> entry : assignments.entrySet()) {
- ret.put(entry.getKey(), entry.getValue().keySet());
- }
- return Response.status(Response.Status.OK).entity(
- new Gson().toJson(ret)).build();
- }
-
- private void validateListFunctionRequestParams(String tenant, String namespace) throws IllegalArgumentException {
-
- if (tenant == null) {
- throw new IllegalArgumentException("Tenant is not provided");
- }
- if (namespace == null) {
- throw new IllegalArgumentException("Namespace is not provided");
- }
- }
-
- private void validateGetFunctionInstanceRequestParams(String tenant,
- String namespace,
- String functionName,
- String instanceId) throws IllegalArgumentException {
- validateGetFunctionRequestParams(tenant, namespace, functionName);
- if (instanceId == null) {
- throw new IllegalArgumentException("Function Instance Id is not provided");
-
- }
- }
-
- private void validateGetFunctionRequestParams(String tenant,
- String namespace,
- String functionName) throws IllegalArgumentException {
-
- if (tenant == null) {
- throw new IllegalArgumentException("Tenant is not provided");
- }
- if (namespace == null) {
- throw new IllegalArgumentException("Namespace is not provided");
- }
- if (functionName == null) {
- throw new IllegalArgumentException("Function Name is not provided");
- }
- }
-
- private void validateDeregisterRequestParams(String tenant,
- String namespace,
- String functionName) throws IllegalArgumentException {
-
- if (tenant == null) {
- throw new IllegalArgumentException("Tenant is not provided");
- }
- if (namespace == null) {
- throw new IllegalArgumentException("Namespace is not provided");
- }
- if (functionName == null) {
- throw new IllegalArgumentException("Function Name is not provided");
- }
- }
-
- private FunctionConfig validateUpdateRequestParams(String tenant,
- String namespace,
- String functionName,
- InputStream uploadedInputStream,
- FormDataContentDisposition fileDetail,
- String functionConfigJson) throws IllegalArgumentException {
- if (tenant == null) {
- throw new IllegalArgumentException("Tenant is not provided");
- }
- if (namespace == null) {
- throw new IllegalArgumentException("Namespace is not provided");
- }
- if (functionName == null) {
- throw new IllegalArgumentException("Function Name is not provided");
- }
- if (uploadedInputStream == null || fileDetail == null) {
- throw new IllegalArgumentException("Function Package is not provided");
- }
- if (functionConfigJson == null) {
- throw new IllegalArgumentException("FunctionConfig is not provided");
- }
- try {
- FunctionConfig.Builder functionConfigBuilder = FunctionConfig.newBuilder();
- JsonFormat.parser().merge(functionConfigJson, functionConfigBuilder);
- FunctionConfig functionConfig = functionConfigBuilder.build();
-
- List<String> missingFields = new LinkedList<>();
- if (functionConfig.getTenant() == null || functionConfig.getTenant().isEmpty()) {
- missingFields.add("Tenant");
- }
- if (functionConfig.getNamespace() == null || functionConfig.getNamespace().isEmpty()) {
- missingFields.add("Namespace");
- }
- if (functionConfig.getName() == null || functionConfig.getName().isEmpty()) {
- missingFields.add("Name");
- }
- if (functionConfig.getClassName() == null || functionConfig.getClassName().isEmpty()) {
- missingFields.add("ClassName");
- }
- if (functionConfig.getInputsCount() == 0 && functionConfig.getCustomSerdeInputsCount() == 0) {
- missingFields.add("Input");
- }
- if (!missingFields.isEmpty()) {
- String errorMessage = StringUtils.join(missingFields, ",");
- throw new IllegalArgumentException(errorMessage + " is not provided");
- }
- if (functionConfig.getParallelism() <= 0) {
- throw new IllegalArgumentException("Parallelism needs to be set to a positive number");
- }
- return functionConfig;
- } catch (IllegalArgumentException ex) {
- throw ex;
- } catch (Exception ex) {
- throw new IllegalArgumentException("Invalid FunctionConfig");
- }
+ return functions.getAssignments();
}
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index fe75940..8bce7ff 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.functions.worker.rest.api.v2;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -57,6 +56,7 @@ import org.apache.pulsar.functions.worker.Utils;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.request.RequestResult;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -98,13 +98,12 @@ public class FunctionApiV2ResourceTest {
private WorkerService mockedWorkerService;
private FunctionMetaDataManager mockedManager;
private Namespace mockedNamespace;
- private FunctionApiV2Resource resource;
+ private FunctionsImpl resource;
private InputStream mockedInputStream;
private FormDataContentDisposition mockedFormData;
@BeforeMethod
public void setup() {
- this.resource = spy(new FunctionApiV2Resource());
this.mockedManager = mock(FunctionMetaDataManager.class);
this.mockedInputStream = mock(InputStream.class);
this.mockedFormData = mock(FormDataContentDisposition.class);
@@ -124,7 +123,7 @@ public class FunctionApiV2ResourceTest {
.setPulsarServiceUrl("pulsar://localhost:6650/");
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
- doReturn(mockedWorkerService).when(resource).worker();
+ this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
}
//
diff --git a/pulsar-zookeeper-utils/pom.xml b/pulsar-zookeeper-utils/pom.xml
index 0a621f0..b8019a5 100644
--- a/pulsar-zookeeper-utils/pom.xml
+++ b/pulsar-zookeeper-utils/pom.xml
@@ -42,21 +42,6 @@
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-server-shaded</artifactId>
- <version>${bookkeeper.version}</version>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>log4j</artifactId>
- <groupId>log4j</groupId>
- </exclusion>
- <exclusion>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.