You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/19 17:00:32 UTC
[pulsar] branch master updated: Added stop/restart functionality in
sources/sinks (#2810)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f3a027b Added stop/restart functionality in sources/sinks (#2810)
f3a027b is described below
commit f3a027b45f8244d19ac24709edc7f762abb5ce6f
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri Oct 19 10:00:25 2018 -0700
Added stop/restart functionality in sources/sinks (#2810)
* Added Get and List source/sink functionality
* Fixed compile
* Removed test that doesnt make sense any more
* Fixed build
* Fixed logic
* Return error response
* Return response on error
* Fix unittest
* Fixed unittest
* Fixed unittest
* Fixed unittest
* Added get/list sinks tests
* Added get/list tests
* Add more unittests
* Added more unittests
* Added TODO
* Took feedback
* Fix unittest
* Fix unittest
* Fix unittest
* Fixed integration tests
* Fixed integration test
* Added restart/stop functionality to the sources/sinks
* Added getstatus method to sources/sink
* Fix integration tests
---
.../pulsar/broker/admin/impl/FunctionsBase.java | 12 +-
.../apache/pulsar/broker/admin/impl/SinkBase.java | 12 +-
.../pulsar/broker/admin/impl/SourceBase.java | 12 +-
.../java/org/apache/pulsar/admin/cli/CmdSinks.java | 69 +++++++++++
.../org/apache/pulsar/admin/cli/CmdSources.java | 91 +++++++++++++--
.../functions/worker/rest/api/FunctionsImpl.java | 129 +++++++++++++--------
.../worker/rest/api/v2/FunctionApiV2Resource.java | 12 +-
.../worker/rest/api/v2/SinkApiV2Resource.java | 12 +-
.../worker/rest/api/v2/SourceApiV2Resource.java | 12 +-
.../integration/functions/PulsarFunctionsTest.java | 39 ++++++-
10 files changed, 299 insertions(+), 101 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index b50da21..01dd354 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -162,7 +162,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) throws IOException {
return functions.getFunctionInstanceStatus(
- tenant, namespace, functionName, instanceId, uri.getRequestUri());
+ tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri());
}
@GET
@@ -179,7 +179,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) throws IOException {
return functions.getFunctionStatus(
- tenant, namespace, functionName, uri.getRequestUri());
+ tenant, namespace, functionName, FunctionsImpl.FUNCTION, uri.getRequestUri());
}
@GET
@@ -256,7 +256,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
public Response restartFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
- return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+ return functions.restartFunctionInstance(tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri());
}
@POST
@@ -268,7 +268,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
@Consumes(MediaType.APPLICATION_JSON)
public Response restartFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
- return functions.restartFunctionInstances(tenant, namespace, functionName);
+ return functions.restartFunctionInstances(tenant, namespace, functionName, FunctionsImpl.FUNCTION);
}
@POST
@@ -281,7 +281,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
public Response stopFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
- return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+ return functions.stopFunctionInstance(tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri());
}
@POST
@@ -293,7 +293,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
@Consumes(MediaType.APPLICATION_JSON)
public Response stopFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
- return functions.stopFunctionInstances(tenant, namespace, functionName);
+ return functions.stopFunctionInstances(tenant, namespace, functionName, FunctionsImpl.FUNCTION);
}
@POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
index 0f5a5c5..ba7dedc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
@@ -148,7 +148,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
final @PathParam("sinkName") String sinkName,
final @PathParam("instanceId") String instanceId) throws IOException {
return functions.getFunctionInstanceStatus(
- tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+ tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, uri.getRequestUri());
}
@GET
@@ -164,7 +164,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
public Response getSinkStatus(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName) throws IOException {
- return functions.getFunctionStatus(tenant, namespace, sinkName, uri.getRequestUri());
+ return functions.getFunctionStatus(tenant, namespace, sinkName, FunctionsImpl.SINK, uri.getRequestUri());
}
@GET
@@ -194,7 +194,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
public Response restartSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
final @PathParam("instanceId") String instanceId) {
- return functions.restartFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+ return functions.restartFunctionInstance(tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, uri.getRequestUri());
}
@POST
@@ -206,7 +206,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
@Consumes(MediaType.APPLICATION_JSON)
public Response restartSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
- return functions.restartFunctionInstances(tenant, namespace, sinkName);
+ return functions.restartFunctionInstances(tenant, namespace, sinkName, FunctionsImpl.SINK);
}
@POST
@@ -219,7 +219,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
public Response stopSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
final @PathParam("instanceId") String instanceId) {
- return functions.stopFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+ return functions.stopFunctionInstance(tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, uri.getRequestUri());
}
@POST
@@ -231,7 +231,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
@Consumes(MediaType.APPLICATION_JSON)
public Response stopSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
- return functions.stopFunctionInstances(tenant, namespace, sinkName);
+ return functions.stopFunctionInstances(tenant, namespace, sinkName, FunctionsImpl.SINK);
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
index 4bda489..c695a1a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
@@ -150,7 +150,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
final @PathParam("sourceName") String sourceName,
final @PathParam("instanceId") String instanceId) throws IOException {
return functions.getFunctionInstanceStatus(
- tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+ tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, uri.getRequestUri());
}
@GET
@@ -166,7 +166,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
public Response getSourceStatus(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName) throws IOException {
- return functions.getFunctionStatus(tenant, namespace, sourceName, uri.getRequestUri());
+ return functions.getFunctionStatus(tenant, namespace, sourceName, FunctionsImpl.SOURCE, uri.getRequestUri());
}
@GET
@@ -197,7 +197,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
public Response restartSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
final @PathParam("instanceId") String instanceId) {
- return functions.restartFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+ return functions.restartFunctionInstance(tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, uri.getRequestUri());
}
@POST
@@ -209,7 +209,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
@Consumes(MediaType.APPLICATION_JSON)
public Response restartSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
- return functions.restartFunctionInstances(tenant, namespace, sourceName);
+ return functions.restartFunctionInstances(tenant, namespace, sourceName, FunctionsImpl.SOURCE);
}
@POST
@@ -222,7 +222,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
public Response stopSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
final @PathParam("instanceId") String instanceId) {
- return functions.stopFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+ return functions.stopFunctionInstance(tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, uri.getRequestUri());
}
@POST
@@ -234,7 +234,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
@Consumes(MediaType.APPLICATION_JSON)
public Response stopSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
- return functions.stopFunctionInstances(tenant, namespace, sourceName);
+ return functions.stopFunctionInstances(tenant, namespace, sourceName, FunctionsImpl.SOURCE);
}
@GET
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 3b9159e..37d2d9b 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -31,6 +31,7 @@ import com.beust.jcommander.Parameters;
import com.beust.jcommander.converters.StringConverter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
import java.io.File;
@@ -67,6 +68,9 @@ public class CmdSinks extends CmdBase {
private final DeleteSink deleteSink;
private final ListSinks listSinks;
private final GetSink getSink;
+ private final GetSinkStatus getSinkStatus;
+ private final StopSink stopSink;
+ private final RestartSink restartSink;
private final LocalSinkRunner localSinkRunner;
public CmdSinks(PulsarAdmin admin) {
@@ -76,6 +80,9 @@ public class CmdSinks extends CmdBase {
deleteSink = new DeleteSink();
listSinks = new ListSinks();
getSink = new GetSink();
+ getSinkStatus = new GetSinkStatus();
+ stopSink = new StopSink();
+ restartSink = new RestartSink();
localSinkRunner = new LocalSinkRunner();
jcommander.addCommand("create", createSink);
@@ -83,6 +90,9 @@ public class CmdSinks extends CmdBase {
jcommander.addCommand("delete", deleteSink);
jcommander.addCommand("list", listSinks);
jcommander.addCommand("get", getSink);
+ jcommander.addCommand("getstatus", getSinkStatus);
+ jcommander.addCommand("stop", stopSink);
+ jcommander.addCommand("restart", restartSink);
jcommander.addCommand("localrun", localSinkRunner);
jcommander.addCommand("available-sinks", new ListBuiltInSinks());
}
@@ -590,6 +600,65 @@ public class CmdSinks extends CmdBase {
}
}
+ @Parameters(commandDescription = "Check the current status of a Pulsar Sink")
+ class GetSinkStatus extends SinkCommand {
+
+ @Parameter(names = "--instance-id", description = "The sink instanceId (Get-status of all instances if instance-id is not provided")
+ protected String instanceId;
+
+ @Override
+ void runCmd() throws Exception {
+ String json = Utils.printJson(
+ isBlank(instanceId) ? admin.sink().getSinkStatus(tenant, namespace, sinkName)
+ : admin.sink().getSinkStatus(tenant, namespace, sinkName,
+ Integer.parseInt(instanceId)));
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ System.out.println(gson.toJson(new JsonParser().parse(json)));
+ }
+ }
+
+ @Parameters(commandDescription = "Restart sink instance")
+ class RestartSink extends SinkCommand {
+
+ @Parameter(names = "--instance-id", description = "The sink instanceId (restart all instances if instance-id is not provided")
+ protected String instanceId;
+
+ @Override
+ void runCmd() throws Exception {
+ if (isNotBlank(instanceId)) {
+ try {
+ admin.sink().restartSink(tenant, namespace, sinkName, Integer.parseInt(instanceId));
+ } catch (NumberFormatException e) {
+ System.err.println("instance-id must be a number");
+ }
+ } else {
+ admin.sink().restartSink(tenant, namespace, sinkName);
+ }
+ System.out.println("Restarted successfully");
+ }
+ }
+
+ @Parameters(commandDescription = "Temporary stops sink instance. (If worker restarts then it reassigns and starts sink again")
+ class StopSink extends SinkCommand {
+
+ @Parameter(names = "--instance-id", description = "The sink instanceId (stop all instances if instance-id is not provided")
+ protected String instanceId;
+
+ @Override
+ void runCmd() throws Exception {
+ if (isNotBlank(instanceId)) {
+ try {
+ admin.sink().stopSink(tenant, namespace, sinkName, Integer.parseInt(instanceId));
+ } catch (NumberFormatException e) {
+ System.err.println("instance-id must be a number");
+ }
+ } else {
+ admin.sink().stopSink(tenant, namespace, sinkName);
+ }
+ System.out.println("Restarted successfully");
+ }
+ }
+
@Parameters(commandDescription = "Get the list of Pulsar IO connector sinks supported by Pulsar cluster")
public class ListBuiltInSinks extends BaseCommand {
@Override
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index f27b0a5..5a1e9b3 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.admin.cli;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static org.apache.pulsar.functions.utils.Utils.fileExists;
@@ -29,6 +31,7 @@ import com.beust.jcommander.Parameters;
import com.beust.jcommander.converters.StringConverter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
import java.io.File;
@@ -69,8 +72,11 @@ public class CmdSources extends CmdBase {
private final CreateSource createSource;
private final DeleteSource deleteSource;
private final GetSource getSource;
+ private final GetSourceStatus getSourceStatus;
private final ListSources listSources;
private final UpdateSource updateSource;
+ private final RestartSource restartSource;
+ private final StopSource stopSource;
private final LocalSourceRunner localSourceRunner;
public CmdSources(PulsarAdmin admin) {
@@ -80,13 +86,19 @@ public class CmdSources extends CmdBase {
deleteSource = new DeleteSource();
listSources = new ListSources();
getSource = new GetSource();
+ getSourceStatus = new GetSourceStatus();
+ restartSource = new RestartSource();
+ stopSource = new StopSource();
localSourceRunner = new LocalSourceRunner();
jcommander.addCommand("create", createSource);
jcommander.addCommand("update", updateSource);
jcommander.addCommand("delete", deleteSource);
jcommander.addCommand("get", getSource);
+ jcommander.addCommand("getstatus", getSourceStatus);
jcommander.addCommand("list", listSources);
+ jcommander.addCommand("stop", stopSource);
+ jcommander.addCommand("restart", restartSource);
jcommander.addCommand("localrun", localSourceRunner);
jcommander.addCommand("available-sources", new ListBuiltInSources());
}
@@ -147,13 +159,13 @@ public class CmdSources extends CmdBase {
protected String tlsTrustCertFilePath;
private void mergeArgs() {
- if (!StringUtils.isBlank(DEPRECATED_brokerServiceUrl)) brokerServiceUrl = DEPRECATED_brokerServiceUrl;
- if (!StringUtils.isBlank(DEPRECATED_clientAuthPlugin)) clientAuthPlugin = DEPRECATED_clientAuthPlugin;
- if (!StringUtils.isBlank(DEPRECATED_clientAuthParams)) clientAuthParams = DEPRECATED_clientAuthParams;
+ if (!isBlank(DEPRECATED_brokerServiceUrl)) brokerServiceUrl = DEPRECATED_brokerServiceUrl;
+ if (!isBlank(DEPRECATED_clientAuthPlugin)) clientAuthPlugin = DEPRECATED_clientAuthPlugin;
+ if (!isBlank(DEPRECATED_clientAuthParams)) clientAuthParams = DEPRECATED_clientAuthParams;
if (DEPRECATED_useTls != null) useTls = DEPRECATED_useTls;
if (DEPRECATED_tlsAllowInsecureConnection != null) tlsAllowInsecureConnection = DEPRECATED_tlsAllowInsecureConnection;
if (DEPRECATED_tlsHostNameVerificationEnabled != null) tlsHostNameVerificationEnabled = DEPRECATED_tlsHostNameVerificationEnabled;
- if (!StringUtils.isBlank(DEPRECATED_tlsTrustCertFilePath)) tlsTrustCertFilePath = DEPRECATED_tlsTrustCertFilePath;
+ if (!isBlank(DEPRECATED_tlsTrustCertFilePath)) tlsTrustCertFilePath = DEPRECATED_tlsTrustCertFilePath;
}
@Override
@@ -280,11 +292,11 @@ public class CmdSources extends CmdBase {
private void mergeArgs() {
if (DEPRECATED_processingGuarantees != null) processingGuarantees = DEPRECATED_processingGuarantees;
- if (!StringUtils.isBlank(DEPRECATED_destinationTopicName)) destinationTopicName = DEPRECATED_destinationTopicName;
- if (!StringUtils.isBlank(DEPRECATED_deserializationClassName)) deserializationClassName = DEPRECATED_deserializationClassName;
- if (!StringUtils.isBlank(DEPRECATED_className)) className = DEPRECATED_className;
- if (!StringUtils.isBlank(DEPRECATED_sourceConfigFile)) sourceConfigFile = DEPRECATED_sourceConfigFile;
- if (!StringUtils.isBlank(DEPRECATED_sourceConfigString)) sourceConfigString = DEPRECATED_sourceConfigString;
+ if (!isBlank(DEPRECATED_destinationTopicName)) destinationTopicName = DEPRECATED_destinationTopicName;
+ if (!isBlank(DEPRECATED_deserializationClassName)) deserializationClassName = DEPRECATED_deserializationClassName;
+ if (!isBlank(DEPRECATED_className)) className = DEPRECATED_className;
+ if (!isBlank(DEPRECATED_sourceConfigFile)) sourceConfigFile = DEPRECATED_sourceConfigFile;
+ if (!isBlank(DEPRECATED_sourceConfigString)) sourceConfigString = DEPRECATED_sourceConfigString;
}
@Override
@@ -382,7 +394,7 @@ public class CmdSources extends CmdBase {
}
protected void validateSourceConfigs(SourceConfig sourceConfig) {
- if (StringUtils.isBlank(sourceConfig.getArchive())) {
+ if (isBlank(sourceConfig.getArchive())) {
throw new ParameterException("Source archive not specfied");
}
@@ -548,6 +560,65 @@ public class CmdSources extends CmdBase {
}
}
+ @Parameters(commandDescription = "Check the current status of a Pulsar Source")
+ class GetSourceStatus extends SourceCommand {
+
+ @Parameter(names = "--instance-id", description = "The source instanceId (Get-status of all instances if instance-id is not provided")
+ protected String instanceId;
+
+ @Override
+ void runCmd() throws Exception {
+ String json = Utils.printJson(
+ isBlank(instanceId) ? admin.source().getSourceStatus(tenant, namespace, sourceName)
+ : admin.source().getSourceStatus(tenant, namespace, sourceName,
+ Integer.parseInt(instanceId)));
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ System.out.println(gson.toJson(new JsonParser().parse(json)));
+ }
+ }
+
+ @Parameters(commandDescription = "Restart source instance")
+ class RestartSource extends SourceCommand {
+
+ @Parameter(names = "--instance-id", description = "The source instanceId (restart all instances if instance-id is not provided")
+ protected String instanceId;
+
+ @Override
+ void runCmd() throws Exception {
+ if (isNotBlank(instanceId)) {
+ try {
+ admin.source().restartSource(tenant, namespace, sourceName, Integer.parseInt(instanceId));
+ } catch (NumberFormatException e) {
+ System.err.println("instance-id must be a number");
+ }
+ } else {
+ admin.source().restartSource(tenant, namespace, sourceName);
+ }
+ System.out.println("Restarted successfully");
+ }
+ }
+
+ @Parameters(commandDescription = "Temporary stops source instance. (If worker restarts then it reassigns and starts source again")
+ class StopSource extends SourceCommand {
+
+ @Parameter(names = "--instance-id", description = "The source instanceId (stop all instances if instance-id is not provided")
+ protected String instanceId;
+
+ @Override
+ void runCmd() throws Exception {
+ if (isNotBlank(instanceId)) {
+ try {
+ admin.source().stopSource(tenant, namespace, sourceName, Integer.parseInt(instanceId));
+ } catch (NumberFormatException e) {
+ System.err.println("instance-id must be a number");
+ }
+ } else {
+ admin.source().stopSource(tenant, namespace, sourceName);
+ }
+ System.out.println("Restarted successfully");
+ }
+ }
+
@Parameters(commandDescription = "Get the list of Pulsar IO connector sources supported by Pulsar cluster")
public class ListBuiltInSources extends BaseCommand {
@Override
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 44bb3bd..0eebe79 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -403,8 +403,8 @@ public class FunctionsImpl {
return Response.status(Status.OK).entity(retval).build();
}
- public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String functionName,
- final String instanceId, URI uri) throws IOException {
+ public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String componentName,
+ final String componentType, final String instanceId, URI uri) throws IOException {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
@@ -412,23 +412,28 @@ public class FunctionsImpl {
// validate parameters
try {
- validateGetFunctionInstanceRequestParams(tenant, namespace, functionName, instanceId);
+ validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId);
} catch (IllegalArgumentException e) {
- log.error("Invalid getFunctionStatus request @ /{}/{}/{}", tenant, namespace, functionName, e);
+ log.error("Invalid get {} Status request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage())).build();
}
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
- if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
- log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", tenant, namespace, functionName);
+ if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+ log.error("{} in get {} Status does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName);
return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+ .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
+ }
+ FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+ if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+ log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
+ return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
}
- FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
int instanceIdInt = Integer.parseInt(instanceId);
if (instanceIdInt < 0 || instanceIdInt >= functionMetaData.getFunctionDetails().getParallelism()) {
- log.error("instanceId in getFunctionStatus out of bounds @ /{}/{}/{}", tenant, namespace, functionName);
+ log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}", componentType, tenant, namespace, componentName);
return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(String.format("Invalid InstanceId"))).build();
}
@@ -436,12 +441,12 @@ public class FunctionsImpl {
FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
FunctionStatus functionStatus = null;
try {
- functionStatus = functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace, functionName,
+ functionStatus = functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace, componentName,
Integer.parseInt(instanceId), uri);
} catch (WebApplicationException we) {
throw we;
} catch (Exception e) {
- log.error("{}/{}/{} Got Exception Getting Status", tenant, namespace, functionName, e);
+ log.error("{}/{}/{} Got Exception Getting Status", tenant, namespace, componentName, e);
return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build();
}
@@ -449,18 +454,18 @@ public class FunctionsImpl {
return Response.status(Status.OK).entity(jsonResponse).build();
}
- public Response stopFunctionInstance(final String tenant, final String namespace, final String functionName,
- final String instanceId, URI uri) {
- return stopFunctionInstance(tenant, namespace, functionName, instanceId, false, uri);
+ public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName,
+ final String componentType, final String instanceId, URI uri) {
+ return stopFunctionInstance(tenant, namespace, componentName, componentType, instanceId, false, uri);
}
- public Response restartFunctionInstance(final String tenant, final String namespace, final String functionName,
- final String instanceId, URI uri) {
- return stopFunctionInstance(tenant, namespace, functionName, instanceId, true, uri);
+ public Response restartFunctionInstance(final String tenant, final String namespace, final String componentName,
+ final String componentType, final String instanceId, URI uri) {
+ return stopFunctionInstance(tenant, namespace, componentName, componentType, instanceId, true, uri);
}
- public Response stopFunctionInstance(final String tenant, final String namespace, final String functionName,
- final String instanceId, boolean restart, URI uri) {
+ public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName,
+ final String componentType, final String instanceId, boolean restart, URI uri) {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
@@ -468,42 +473,51 @@ public class FunctionsImpl {
// validate parameters
try {
- validateGetFunctionInstanceRequestParams(tenant, namespace, functionName, instanceId);
+ validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId);
} catch (IllegalArgumentException e) {
- log.error("Invalid restart-function request @ /{}/{}/{}", tenant, namespace, functionName, e);
+ log.error("Invalid restart {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage())).build();
}
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
- if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
- log.error("Function does not exist @ /{}/{}/{}", tenant, namespace, functionName);
+ if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+ log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+ .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
+ }
+
+ FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+ if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+ log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
+ return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
}
FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
try {
- return functionRuntimeManager.stopFunctionInstance(tenant, namespace, functionName,
+ return functionRuntimeManager.stopFunctionInstance(tenant, namespace, componentName,
Integer.parseInt(instanceId), restart, uri);
} catch (WebApplicationException we) {
throw we;
} catch (Exception e) {
- log.error("Failed to restart function: {}/{}/{}/{}", tenant, namespace, functionName, instanceId, e);
+ log.error("Failed to restart {}: {}/{}/{}/{}", componentType, tenant, namespace, componentName, instanceId, e);
return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build();
}
}
- public Response stopFunctionInstances(final String tenant, final String namespace, final String functionName) {
- return stopFunctionInstances(tenant, namespace, functionName, false);
+ public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName,
+ final String componentType) {
+ return stopFunctionInstances(tenant, namespace, componentName, componentType, false);
}
- public Response restartFunctionInstances(final String tenant, final String namespace, final String functionName) {
- return stopFunctionInstances(tenant, namespace, functionName, true);
+ public Response restartFunctionInstances(final String tenant, final String namespace, final String componentName,
+ final String componentType) {
+ return stopFunctionInstances(tenant, namespace, componentName, componentType, true);
}
- public Response stopFunctionInstances(final String tenant, final String namespace, final String functionName,
- boolean restart) {
+ public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName,
+ final String componentType, boolean restart) {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
@@ -511,32 +525,40 @@ public class FunctionsImpl {
// validate parameters
try {
- validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION);
+ validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
} catch (IllegalArgumentException e) {
- log.error("Invalid restart-Function request @ /{}/{}/{}", tenant, namespace, functionName, e);
+ log.error("Invalid restart {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage())).build();
}
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
- if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
- log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", tenant, namespace, functionName);
+ if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+ log.error("{} in getFunctionStatus does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+ .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
+ }
+
+ FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+ if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+ log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
+ return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
}
FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
try {
- return functionRuntimeManager.stopFunctionInstances(tenant, namespace, functionName, restart);
+ return functionRuntimeManager.stopFunctionInstances(tenant, namespace, componentName, restart);
} catch (WebApplicationException we) {
throw we;
} catch (Exception e) {
- log.error("Failed to restart function: {}/{}/{}", tenant, namespace, functionName, e);
+ log.error("Failed to restart {}: {}/{}/{}", componentType, tenant, namespace, componentName, e);
return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build();
}
}
- public Response getFunctionStatus(final String tenant, final String namespace, final String functionName, URI uri)
+ public Response getFunctionStatus(final String tenant, final String namespace, final String componentName,
+ final String componentType, URI uri)
throws IOException {
if (!isWorkerServiceAvailable()) {
@@ -545,24 +567,31 @@ public class FunctionsImpl {
// validate parameters
try {
- validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION);
+ validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
} catch (IllegalArgumentException e) {
- log.error("Invalid getFunctionStatus request @ /{}/{}/{}", tenant, namespace, functionName, e);
+ log.error("Invalid get {} Status request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage())).build();
}
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
- if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
- log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", tenant, namespace, functionName);
+ if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+ log.error("{} in get {} Status does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName);
return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+ .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
+ }
+
+ FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+ if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+ log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
+ return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
}
FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
InstanceCommunication.FunctionStatusList functionStatusList = null;
try {
- functionStatusList = functionRuntimeManager.getAllFunctionStatus(tenant, namespace, functionName, uri);
+ functionStatusList = functionRuntimeManager.getAllFunctionStatus(tenant, namespace, componentName, uri);
} catch (WebApplicationException we) {
throw we;
} catch (Exception e) {
@@ -870,11 +899,11 @@ public class FunctionsImpl {
}
}
- private void validateGetFunctionInstanceRequestParams(String tenant, String namespace, String functionName,
- String instanceId) throws IllegalArgumentException {
- validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION);
+ private void validateGetFunctionInstanceRequestParams(String tenant, String namespace, String componentName,
+ String componentType, String instanceId) throws IllegalArgumentException {
+ validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
if (instanceId == null) {
- throw new IllegalArgumentException("Function Instance Id is not provided");
+ throw new IllegalArgumentException(String.format("%s Instance Id is not provided", componentType));
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 405f88f..e37a88a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -107,7 +107,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) throws IOException {
return functions.getFunctionInstanceStatus(
- tenant, namespace, functionName, instanceId, uri.getRequestUri());
+ tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri());
}
@GET
@@ -116,7 +116,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) throws IOException {
return functions.getFunctionStatus(
- tenant, namespace, functionName, uri.getRequestUri());
+ tenant, namespace, functionName, FunctionsImpl.FUNCTION, uri.getRequestUri());
}
@GET
@@ -150,7 +150,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
public Response restartFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
- return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
+ return functions.restartFunctionInstance(tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, this.uri.getRequestUri());
}
@POST
@@ -162,7 +162,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
@Consumes(MediaType.APPLICATION_JSON)
public Response restartFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
- return functions.restartFunctionInstances(tenant, namespace, functionName);
+ return functions.restartFunctionInstances(tenant, namespace, functionName, FunctionsImpl.FUNCTION);
}
@POST
@@ -175,7 +175,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
public Response stopFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
- return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
+ return functions.stopFunctionInstance(tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, this.uri.getRequestUri());
}
@POST
@@ -187,7 +187,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
@Consumes(MediaType.APPLICATION_JSON)
public Response stopFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
- return functions.stopFunctionInstances(tenant, namespace, functionName);
+ return functions.stopFunctionInstances(tenant, namespace, functionName, FunctionsImpl.FUNCTION);
}
@POST
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
index 488f47d..934b0fc 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
@@ -97,7 +97,7 @@ public class SinkApiV2Resource extends FunctionApiResource {
final @PathParam("sinkName") String sinkName,
final @PathParam("instanceId") String instanceId) throws IOException {
return functions.getFunctionInstanceStatus(
- tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+ tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, uri.getRequestUri());
}
@GET
@@ -105,7 +105,7 @@ public class SinkApiV2Resource extends FunctionApiResource {
public Response getSinkStatus(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName) throws IOException {
- return functions.getFunctionStatus(tenant, namespace, sinkName, uri.getRequestUri());
+ return functions.getFunctionStatus(tenant, namespace, sinkName, FunctionsImpl.SINK, uri.getRequestUri());
}
@GET
@@ -126,7 +126,7 @@ public class SinkApiV2Resource extends FunctionApiResource {
public Response restartSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
final @PathParam("instanceId") String instanceId) {
- return functions.restartFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri());
+ return functions.restartFunctionInstance(tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, this.uri.getRequestUri());
}
@POST
@@ -138,7 +138,7 @@ public class SinkApiV2Resource extends FunctionApiResource {
@Consumes(MediaType.APPLICATION_JSON)
public Response restartSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
- return functions.restartFunctionInstances(tenant, namespace, sinkName);
+ return functions.restartFunctionInstances(tenant, namespace, sinkName, FunctionsImpl.SINK);
}
@POST
@@ -151,7 +151,7 @@ public class SinkApiV2Resource extends FunctionApiResource {
public Response stopSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
final @PathParam("instanceId") String instanceId) {
- return functions.stopFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri());
+ return functions.stopFunctionInstance(tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, this.uri.getRequestUri());
}
@POST
@@ -163,7 +163,7 @@ public class SinkApiV2Resource extends FunctionApiResource {
@Consumes(MediaType.APPLICATION_JSON)
public Response stopSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
- return functions.stopFunctionInstances(tenant, namespace, sinkName);
+ return functions.stopFunctionInstances(tenant, namespace, sinkName, FunctionsImpl.SINK);
}
@GET
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
index 3b1222e..2c39344 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
@@ -97,7 +97,7 @@ public class SourceApiV2Resource extends FunctionApiResource {
final @PathParam("sourceName") String sourceName,
final @PathParam("instanceId") String instanceId) throws IOException {
return functions.getFunctionInstanceStatus(
- tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+ tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, uri.getRequestUri());
}
@GET
@@ -105,7 +105,7 @@ public class SourceApiV2Resource extends FunctionApiResource {
public Response getSourceStatus(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName) throws IOException {
- return functions.getFunctionStatus(tenant, namespace, sourceName, uri.getRequestUri());
+ return functions.getFunctionStatus(tenant, namespace, sourceName, FunctionsImpl.SOURCE, uri.getRequestUri());
}
@GET
@@ -126,7 +126,7 @@ public class SourceApiV2Resource extends FunctionApiResource {
public Response restartSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
final @PathParam("instanceId") String instanceId) {
- return functions.restartFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri());
+ return functions.restartFunctionInstance(tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, this.uri.getRequestUri());
}
@POST
@@ -138,7 +138,7 @@ public class SourceApiV2Resource extends FunctionApiResource {
@Consumes(MediaType.APPLICATION_JSON)
public Response restartSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
- return functions.restartFunctionInstances(tenant, namespace, sourceName);
+ return functions.restartFunctionInstances(tenant, namespace, sourceName, FunctionsImpl.SOURCE);
}
@POST
@@ -151,7 +151,7 @@ public class SourceApiV2Resource extends FunctionApiResource {
public Response stopSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
final @PathParam("instanceId") String instanceId) {
- return functions.stopFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri());
+ return functions.stopFunctionInstance(tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, this.uri.getRequestUri());
}
@POST
@@ -163,7 +163,7 @@ public class SourceApiV2Resource extends FunctionApiResource {
@Consumes(MediaType.APPLICATION_JSON)
public Response stopSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
- return functions.stopFunctionInstances(tenant, namespace, sourceName);
+ return functions.stopFunctionInstances(tenant, namespace, sourceName, FunctionsImpl.SOURCE);
}
@GET
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 6659fcf..0222bd6 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -154,7 +154,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
}
// wait for sink to process messages
- waitForProcessingMessages(tenant, namespace, sinkName, numMessages);
+ waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages);
// validate the sink result
tester.validateSinkResult(kvs);
@@ -238,7 +238,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
protected void getSinkStatus(String tenant, String namespace, String sinkName) throws Exception {
String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
- "functions",
+ "sink",
"getstatus",
"--tenant", tenant,
"--namespace", namespace,
@@ -254,7 +254,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
} catch (ContainerExecException e) {
// expected in early iterations
}
- log.info("Backoff 1 second until the function is running");
+ log.info("Backoff 1 second until the sink is running");
TimeUnit.SECONDS.sleep(1);
}
}
@@ -482,7 +482,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
protected void getSourceStatus(String tenant, String namespace, String sourceName) throws Exception {
String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
- "functions",
+ "source",
"getstatus",
"--tenant", tenant,
"--namespace", namespace,
@@ -518,7 +518,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
int numMessages) throws Exception {
String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
- "functions",
+ "source",
"getstatus",
"--tenant", tenant,
"--namespace", namespace,
@@ -541,6 +541,35 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
}
}
+ protected void waitForProcessingSinkMessages(String tenant,
+ String namespace,
+ String sinkName,
+ int numMessages) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "sink",
+ "getstatus",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sinkName
+ };
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ while (true) {
+ try {
+ ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get sink status : {}", result.getStdout());
+ if (result.getStdout().contains("\"numProcessed\": \"" + numMessages + "\"")) {
+ return;
+ }
+ } catch (ContainerExecException e) {
+ // expected for early iterations
+ }
+ log.info("{} ms has elapsed but the sink hasn't process {} messages, backoff to wait for another 1 second",
+ stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+
protected void deleteSource(String tenant, String namespace, String sourceName) throws Exception {
String[] commands = {
PulsarCluster.ADMIN_SCRIPT,