You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/19 04:59:32 UTC
[pulsar] branch master updated: Source/Sink Endpoint validations
(#2807)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1386e6d Source/Sink Endpoint validations (#2807)
1386e6d is described below
commit 1386e6dfdba16bb1ee0a3c7649cb7daed50a5d8c
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu Oct 18 21:59:27 2018 -0700
Source/Sink Endpoint validations (#2807)
* Added Get and List source/sink functionality
* Fixed compile
* Removed test that doesnt make sense any more
* Fixed build
* Fixed logic
* Return error response
* Return response on error
* Fix unittest
* Fixed unittest
* Fixed unittest
* Fixed unittest
* Added get/list sinks tests
* Added get/list tests
* Add more unittests
* Added more unittests
* Added TODO
* Took feedback
* Fix unittest
* Fix unittest
* Fix unittest
* Fixed integration tests
* Fixed integration test
---
.../pulsar/broker/admin/impl/FunctionsBase.java | 10 +-
.../apache/pulsar/broker/admin/impl/SinkBase.java | 10 +-
.../pulsar/broker/admin/impl/SourceBase.java | 10 +-
.../worker/PulsarWorkerAssignmentTest.java | 2 +-
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 30 ---
.../org/apache/pulsar/client/admin/Functions.java | 3 +-
.../java/org/apache/pulsar/client/admin/Sink.java | 6 +-
.../org/apache/pulsar/client/admin/Source.java | 6 +-
.../client/admin/internal/FunctionsImpl.java | 8 +-
.../pulsar/client/admin/internal/SinkImpl.java | 12 +-
.../pulsar/client/admin/internal/SourceImpl.java | 12 +-
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 4 +-
.../java/org/apache/pulsar/admin/cli/CmdSinks.java | 86 +++++--
.../org/apache/pulsar/admin/cli/CmdSources.java | 84 +++++--
.../org/apache/pulsar/admin/cli/TestCmdSinks.java | 8 +-
.../apache/pulsar/admin/cli/TestCmdSources.java | 8 +-
.../functions/utils/FunctionConfigUtils.java | 82 +++++++
.../pulsar/functions/utils/SinkConfigUtils.java | 57 +++++
.../pulsar/functions/utils/SourceConfigUtils.java | 44 +++-
.../org/apache/pulsar/functions/utils/Utils.java | 20 ++
.../functions/utils/FunctionConfigUtilsTest.java | 61 +++++
.../functions/utils/SinkConfigUtilsTest.java | 59 +++++
.../functions/utils/SourceConfigUtilsTest.java | 53 +++++
.../functions/worker/FunctionMetaDataManager.java | 6 +-
.../functions/worker/rest/api/FunctionsImpl.java | 252 ++++++++++++---------
.../worker/rest/api/v2/FunctionApiV2Resource.java | 11 +-
.../worker/rest/api/v2/SinkApiV2Resource.java | 11 +-
.../worker/rest/api/v2/SourceApiV2Resource.java | 11 +-
.../worker/FunctionMetaDataManagerTest.java | 23 +-
.../rest/api/v2/FunctionApiV2ResourceTest.java | 80 +++++--
.../worker/rest/api/v2/SinkApiV2ResourceTest.java | 168 ++++++++------
.../rest/api/v2/SourceApiV2ResourceTest.java | 143 +++++++-----
.../integration/functions/PulsarFunctionsTest.java | 16 +-
33 files changed, 984 insertions(+), 412 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index def4b2b..b50da21 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -84,7 +84,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @FormDataParam("functionConfig") String functionConfigJson) {
return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
- functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());
+ functionPkgUrl, functionDetailsJson, functionConfigJson, FunctionsImpl.FUNCTION, clientAppId());
}
@PUT
@@ -106,7 +106,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @FormDataParam("functionConfig") String functionConfigJson) {
return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
- functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());
+ functionPkgUrl, functionDetailsJson, functionConfigJson, FunctionsImpl.FUNCTION, clientAppId());
}
@@ -124,7 +124,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
public Response deregisterFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) {
- return functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
+ return functions.deregisterFunction(tenant, namespace, functionName, FunctionsImpl.FUNCTION, clientAppId());
}
@GET
@@ -143,7 +143,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) throws IOException {
return functions.getFunctionInfo(
- tenant, namespace, functionName);
+ tenant, namespace, functionName, FunctionsImpl.FUNCTION);
}
@GET
@@ -196,7 +196,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
public Response listFunctions(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace) {
return functions.listFunctions(
- tenant, namespace);
+ tenant, namespace, FunctionsImpl.FUNCTION);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
index 2fe3989..0f5a5c5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
@@ -72,7 +72,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
final @FormDataParam("sinkConfig") String sinkConfigJson) {
return functions.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
- functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
+ functionPkgUrl, null, sinkConfigJson, FunctionsImpl.SINK, clientAppId());
}
@PUT
@@ -93,7 +93,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
final @FormDataParam("sinkConfig") String sinkConfigJson) {
return functions.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
- functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
+ functionPkgUrl, null, sinkConfigJson, FunctionsImpl.SINK, clientAppId());
}
@@ -111,7 +111,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
public Response deregisterSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName) {
- return functions.deregisterFunction(tenant, namespace, sinkName, clientAppId());
+ return functions.deregisterFunction(tenant, namespace, sinkName, FunctionsImpl.SINK, clientAppId());
}
@GET
@@ -129,7 +129,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
public Response getSinkInfo(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName) throws IOException {
- return functions.getFunctionInfo(tenant, namespace, sinkName);
+ return functions.getFunctionInfo(tenant, namespace, sinkName, FunctionsImpl.SINK);
}
@GET
@@ -180,7 +180,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
@Path("/{tenant}/{namespace}")
public Response listSinks(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace) {
- return functions.listFunctions(tenant, namespace);
+ return functions.listFunctions(tenant, namespace, FunctionsImpl.SINK);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
index 1a82ac2..4bda489 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
@@ -73,7 +73,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
final @FormDataParam("sourceConfig") String sourceConfigJson) {
return functions.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
- functionPkgUrl, null, null, sourceConfigJson, null, clientAppId());
+ functionPkgUrl, null, sourceConfigJson, FunctionsImpl.SOURCE, clientAppId());
}
@PUT
@@ -94,7 +94,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
final @FormDataParam("sourceConfig") String sourceConfigJson) {
return functions.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
- functionPkgUrl, null, null, sourceConfigJson, null, clientAppId());
+ functionPkgUrl, null, sourceConfigJson, FunctionsImpl.SOURCE, clientAppId());
}
@@ -112,7 +112,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
public Response deregisterSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName) {
- return functions.deregisterFunction(tenant, namespace, sourceName, clientAppId());
+ return functions.deregisterFunction(tenant, namespace, sourceName, FunctionsImpl.SOURCE, clientAppId());
}
@GET
@@ -131,7 +131,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName) throws IOException {
return functions.getFunctionInfo(
- tenant, namespace, sourceName);
+ tenant, namespace, sourceName, FunctionsImpl.SOURCE);
}
@GET
@@ -183,7 +183,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
public Response listSources(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace) {
return functions.listFunctions(
- tenant, namespace);
+ tenant, namespace, FunctionsImpl.SOURCE);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index afa7ef1..acd6ed3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -308,7 +308,7 @@ public class PulsarWorkerAssignmentTest {
// validate updated function prop = auto-ack=false and instnaceid
for (int i = 0; i < (totalFunctions - totalDeletedFunction); i++) {
String functionName = baseFunctionName + i;
- assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).getAutoAck());
+ assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).isAutoAck());
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index babcdd6..21133d6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -407,36 +407,6 @@ public class PulsarFunctionE2ETest {
}
}
- /**
- * Test to verify: function-server loads jar using file-url and derives type-args classes if not provided
- * @throws Exception
- */
- @Test(timeOut = 20000)
- public void testFileUrlFunctionWithoutPassingTypeArgs() throws Exception {
-
- final String namespacePortion = "io";
- final String replNamespace = tenant + "/" + namespacePortion;
- final String sinkTopic = "persistent://" + replNamespace + "/output";
- final String functionName = "PulsarSink-test";
- final String subscriptionName = "test-sub";
- admin.namespaces().createNamespace(replNamespace);
- Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
- admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
-
- String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-examples.jar").getFile();
-
- FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
- "my.*", sinkTopic, subscriptionName);
-
- admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
-
- FunctionDetails functionMetadata = admin.source().getSource(tenant, namespacePortion, functionName);
-
- assertEquals(functionMetadata.getSource().getTypeClassName(), String.class.getName());
- assertEquals(functionMetadata.getSink().getTypeClassName(), String.class.getName());
-
- }
-
@Test(timeOut = 20000)
public void testFunctionStopAndRestartApi() throws Exception {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 10c890c..a722605 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -25,7 +25,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedExceptio
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
import org.apache.pulsar.functions.utils.FunctionConfig;
@@ -77,7 +76,7 @@ public interface Functions {
* @throws PulsarAdminException
* Unexpected error
*/
- FunctionDetails getFunction(String tenant, String namespace, String function) throws PulsarAdminException;
+ FunctionConfig getFunction(String tenant, String namespace, String function) throws PulsarAdminException;
/**
* Create a new function.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
index 3f8fe2f..afad6f3 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
@@ -22,13 +22,11 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedExceptio
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
import org.apache.pulsar.functions.utils.SinkConfig;
import java.util.List;
-import java.util.Set;
/**
* Admin interface for Sink management.
@@ -50,7 +48,7 @@ public interface Sink {
* @throws PulsarAdminException
* Unexpected error
*/
- List<String> getSinks(String tenant, String namespace) throws PulsarAdminException;
+ List<String> listSinks(String tenant, String namespace) throws PulsarAdminException;
/**
* Get the configuration for the specified sink.
@@ -77,7 +75,7 @@ public interface Sink {
* @throws PulsarAdminException
* Unexpected error
*/
- Function.FunctionDetails getSink(String tenant, String namespace, String sink) throws PulsarAdminException;
+ SinkConfig getSink(String tenant, String namespace, String sink) throws PulsarAdminException;
/**
* Create a new sink.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
index 3c43cf2..9d1a318 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
@@ -22,13 +22,11 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedExceptio
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
import org.apache.pulsar.functions.utils.SourceConfig;
import java.util.List;
-import java.util.Set;
/**
* Admin interface for Source management.
@@ -50,7 +48,7 @@ public interface Source {
* @throws PulsarAdminException
* Unexpected error
*/
- List<String> getSources(String tenant, String namespace) throws PulsarAdminException;
+ List<String> listSources(String tenant, String namespace) throws PulsarAdminException;
/**
* Get the configuration for the specified source.
@@ -77,7 +75,7 @@ public interface Source {
* @throws PulsarAdminException
* Unexpected error
*/
- Function.FunctionDetails getSource(String tenant, String namespace, String source) throws PulsarAdminException;
+ SourceConfig getSource(String tenant, String namespace, String source) throws PulsarAdminException;
/**
* Create a new source.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index b9ea1a5..77cc3d6 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -46,7 +46,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.ErrorData;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
import org.apache.pulsar.functions.utils.FunctionConfig;
@@ -80,16 +79,13 @@ public class FunctionsImpl extends BaseResource implements Functions {
}
@Override
- public FunctionDetails getFunction(String tenant, String namespace, String function) throws PulsarAdminException {
+ public FunctionConfig getFunction(String tenant, String namespace, String function) throws PulsarAdminException {
try {
Response response = request(functions.path(tenant).path(namespace).path(function)).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
- String jsonResponse = response.readEntity(String.class);
- FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
- mergeJson(jsonResponse, functionDetailsBuilder);
- return functionDetailsBuilder.build();
+ return response.readEntity(FunctionConfig.class);
} catch (Exception e) {
throw getApiException(e);
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
index 4e13693..d117374 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
@@ -27,7 +27,6 @@ import org.apache.pulsar.client.admin.Sink;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.ErrorData;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
import org.apache.pulsar.functions.utils.SinkConfig;
@@ -44,8 +43,6 @@ import javax.ws.rs.core.Response;
import java.io.File;
import java.io.IOException;
import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
@Slf4j
public class SinkImpl extends BaseResource implements Sink {
@@ -58,7 +55,7 @@ public class SinkImpl extends BaseResource implements Sink {
}
@Override
- public List<String> getSinks(String tenant, String namespace) throws PulsarAdminException {
+ public List<String> listSinks(String tenant, String namespace) throws PulsarAdminException {
try {
Response response = request(sink.path(tenant).path(namespace)).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
@@ -72,16 +69,13 @@ public class SinkImpl extends BaseResource implements Sink {
}
@Override
- public FunctionDetails getSink(String tenant, String namespace, String sinkName) throws PulsarAdminException {
+ public SinkConfig getSink(String tenant, String namespace, String sinkName) throws PulsarAdminException {
try {
Response response = request(sink.path(tenant).path(namespace).path(sinkName)).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
- String jsonResponse = response.readEntity(String.class);
- FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
- mergeJson(jsonResponse, functionDetailsBuilder);
- return functionDetailsBuilder.build();
+ return response.readEntity(SinkConfig.class);
} catch (Exception e) {
throw getApiException(e);
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
index 65a2bfc..0c7a1df 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
@@ -27,7 +27,6 @@ import org.apache.pulsar.client.admin.Source;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.ErrorData;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
import org.apache.pulsar.functions.utils.SourceConfig;
@@ -44,8 +43,6 @@ import javax.ws.rs.core.Response;
import java.io.File;
import java.io.IOException;
import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
@Slf4j
public class SourceImpl extends BaseResource implements Source {
@@ -58,7 +55,7 @@ public class SourceImpl extends BaseResource implements Source {
}
@Override
- public List<String> getSources(String tenant, String namespace) throws PulsarAdminException {
+ public List<String> listSources(String tenant, String namespace) throws PulsarAdminException {
try {
Response response = request(source.path(tenant).path(namespace)).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
@@ -72,16 +69,13 @@ public class SourceImpl extends BaseResource implements Source {
}
@Override
- public FunctionDetails getSource(String tenant, String namespace, String sourceName) throws PulsarAdminException {
+ public SourceConfig getSource(String tenant, String namespace, String sourceName) throws PulsarAdminException {
try {
Response response = request(source.path(tenant).path(namespace).path(sourceName)).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
- String jsonResponse = response.readEntity(String.class);
- FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
- mergeJson(jsonResponse, functionDetailsBuilder);
- return functionDetailsBuilder.build();
+ return response.readEntity(SourceConfig.class);
} catch (Exception e) {
throw getApiException(e);
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 2f23a63..7e44b44 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -697,9 +697,9 @@ public class CmdFunctions extends CmdBase {
class GetFunction extends FunctionCommand {
@Override
void runCmd() throws Exception {
- String json = Utils.printJson(admin.functions().getFunction(tenant, namespace, functionName));
+ FunctionConfig functionConfig = admin.functions().getFunction(tenant, namespace, functionName);
Gson gson = new GsonBuilder().setPrettyPrinting().create();
- System.out.println(gson.toJson(new JsonParser().parse(json)));
+ System.out.println(gson.toJson(functionConfig));
}
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index e56a343..3b9159e 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -30,16 +30,14 @@ import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.beust.jcommander.converters.StringConverter;
import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.stream.Collectors;
import lombok.Getter;
@@ -67,6 +65,8 @@ public class CmdSinks extends CmdBase {
private final CreateSink createSink;
private final UpdateSink updateSink;
private final DeleteSink deleteSink;
+ private final ListSinks listSinks;
+ private final GetSink getSink;
private final LocalSinkRunner localSinkRunner;
public CmdSinks(PulsarAdmin admin) {
@@ -74,11 +74,15 @@ public class CmdSinks extends CmdBase {
createSink = new CreateSink();
updateSink = new UpdateSink();
deleteSink = new DeleteSink();
+ listSinks = new ListSinks();
+ getSink = new GetSink();
localSinkRunner = new LocalSinkRunner();
jcommander.addCommand("create", createSink);
jcommander.addCommand("update", updateSink);
jcommander.addCommand("delete", deleteSink);
+ jcommander.addCommand("list", listSinks);
+ jcommander.addCommand("get", getSink);
jcommander.addCommand("localrun", localSinkRunner);
jcommander.addCommand("available-sinks", new ListBuiltInSinks());
}
@@ -184,7 +188,7 @@ public class CmdSinks extends CmdBase {
}
@Parameters(commandDescription = "Submit a Pulsar IO sink connector to run in a Pulsar cluster")
- protected class CreateSink extends SinkCommand {
+ protected class CreateSink extends SinkDetailsCommand {
@Override
void runCmd() throws Exception {
if (Utils.isFunctionPackageUrlSupported(archive)) {
@@ -197,7 +201,7 @@ public class CmdSinks extends CmdBase {
}
@Parameters(commandDescription = "Update a Pulsar IO sink connector")
- protected class UpdateSink extends SinkCommand {
+ protected class UpdateSink extends SinkDetailsCommand {
@Override
void runCmd() throws Exception {
if (Utils.isFunctionPackageUrlSupported(archive)) {
@@ -209,7 +213,7 @@ public class CmdSinks extends CmdBase {
}
}
- abstract class SinkCommand extends BaseCommand {
+ abstract class SinkDetailsCommand extends BaseCommand {
@Parameter(names = "--tenant", description = "The sink's tenant")
protected String tenant;
@Parameter(names = "--namespace", description = "The sink's namespace")
@@ -506,25 +510,70 @@ public class CmdSinks extends CmdBase {
}
}
- @Parameters(commandDescription = "Stops a Pulsar IO sink connector")
- protected class DeleteSink extends BaseCommand {
-
- @Parameter(names = "--tenant", description = "The tenant of the sink")
+ /**
+ * Sink level command
+ */
+ @Getter
+ abstract class SinkCommand extends BaseCommand {
+ @Parameter(names = "--tenant", description = "The sink's tenant")
protected String tenant;
- @Parameter(names = "--namespace", description = "The namespace of the sink")
+ @Parameter(names = "--namespace", description = "The sink's namespace")
protected String namespace;
- @Parameter(names = "--name", description = "The name of the sink")
- protected String name;
+ @Parameter(names = "--name", description = "The sink's name")
+ protected String sinkName;
@Override
void processArguments() throws Exception {
super.processArguments();
- if (null == name) {
- throw new ParameterException(
+ if (tenant == null) {
+ tenant = PUBLIC_TENANT;
+ }
+ if (namespace == null) {
+ namespace = DEFAULT_NAMESPACE;
+ }
+ if (null == sinkName) {
+ throw new RuntimeException(
"You must specify a name for the sink");
}
+ }
+ }
+
+ @Parameters(commandDescription = "Stops a Pulsar IO sink connector")
+ protected class DeleteSink extends SinkCommand {
+
+ @Override
+ void runCmd() throws Exception {
+ admin.sink().deleteSink(tenant, namespace, sinkName);
+ print("Deleted successfully");
+ }
+ }
+
+ @Parameters(commandDescription = "Gets the information about a Pulsar IO sink connector")
+ protected class GetSink extends SinkCommand {
+
+ @Override
+ void runCmd() throws Exception {
+ SinkConfig sinkConfig = admin.sink().getSink(tenant, namespace, sinkName);
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ System.out.println(gson.toJson(sinkConfig));
+ }
+ }
+
+ /**
+ * List Sources command
+ */
+ @Parameters(commandDescription = "List all running Pulsar IO sink connectors")
+ protected class ListSinks extends BaseCommand {
+ @Parameter(names = "--tenant", description = "The sink's tenant")
+ protected String tenant;
+
+ @Parameter(names = "--namespace", description = "The sink's namespace")
+ protected String namespace;
+
+ @Override
+ public void processArguments() {
if (tenant == null) {
tenant = PUBLIC_TENANT;
}
@@ -535,8 +584,9 @@ public class CmdSinks extends CmdBase {
@Override
void runCmd() throws Exception {
- admin.sink().deleteSink(tenant, namespace, name);
- print("Deleted successfully");
+ List<String> sinks = admin.sink().listSinks(tenant, namespace);
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ System.out.println(gson.toJson(sinks));
}
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index f2d7681..f27b0a5 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -28,6 +28,7 @@ import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.beust.jcommander.converters.StringConverter;
import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import java.io.File;
@@ -35,6 +36,7 @@ import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.file.Paths;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -66,6 +68,8 @@ public class CmdSources extends CmdBase {
private final CreateSource createSource;
private final DeleteSource deleteSource;
+ private final GetSource getSource;
+ private final ListSources listSources;
private final UpdateSource updateSource;
private final LocalSourceRunner localSourceRunner;
@@ -74,11 +78,15 @@ public class CmdSources extends CmdBase {
createSource = new CreateSource();
updateSource = new UpdateSource();
deleteSource = new DeleteSource();
+ listSources = new ListSources();
+ getSource = new GetSource();
localSourceRunner = new LocalSourceRunner();
jcommander.addCommand("create", createSource);
jcommander.addCommand("update", updateSource);
jcommander.addCommand("delete", deleteSource);
+ jcommander.addCommand("get", getSource);
+ jcommander.addCommand("list", listSources);
jcommander.addCommand("localrun", localSourceRunner);
jcommander.addCommand("available-sources", new ListBuiltInSources());
}
@@ -184,7 +192,7 @@ public class CmdSources extends CmdBase {
}
@Parameters(commandDescription = "Submit a Pulsar IO source connector to run in a Pulsar cluster")
- protected class CreateSource extends SourceCommand {
+ protected class CreateSource extends SourceDetailsCommand {
@Override
void runCmd() throws Exception {
if (Utils.isFunctionPackageUrlSupported(this.sourceConfig.getArchive())) {
@@ -197,7 +205,7 @@ public class CmdSources extends CmdBase {
}
@Parameters(commandDescription = "Update a Pulsar IO source connector")
- protected class UpdateSource extends SourceCommand {
+ protected class UpdateSource extends SourceDetailsCommand {
@Override
void runCmd() throws Exception {
if (Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive())) {
@@ -209,7 +217,7 @@ public class CmdSources extends CmdBase {
}
}
- abstract class SourceCommand extends BaseCommand {
+ abstract class SourceDetailsCommand extends BaseCommand {
@Parameter(names = "--tenant", description = "The source's tenant")
protected String tenant;
@Parameter(names = "--namespace", description = "The source's namespace")
@@ -460,40 +468,86 @@ public class CmdSources extends CmdBase {
}
}
- @Parameters(commandDescription = "Stops a Pulsar IO source connector")
- protected class DeleteSource extends BaseCommand {
-
- @Parameter(names = "--tenant", description = "The tenant of a sink or source")
+ /**
+ * Function level command
+ */
+ @Getter
+ abstract class SourceCommand extends BaseCommand {
+ @Parameter(names = "--tenant", description = "The source's tenant")
protected String tenant;
- @Parameter(names = "--namespace", description = "The namespace of a sink or source")
+ @Parameter(names = "--namespace", description = "The source's namespace")
protected String namespace;
- @Parameter(names = "--name", description = "The name of a sink or source")
- protected String name;
+ @Parameter(names = "--name", description = "The source's name")
+ protected String sourceName;
@Override
void processArguments() throws Exception {
super.processArguments();
- if (null == name) {
- throw new ParameterException(
- "You must specify a name for the source");
- }
if (tenant == null) {
tenant = PUBLIC_TENANT;
}
if (namespace == null) {
namespace = DEFAULT_NAMESPACE;
}
+ if (null == sourceName) {
+ throw new RuntimeException(
+ "You must specify a name for the source");
+ }
}
+ }
+
+ @Parameters(commandDescription = "Stops a Pulsar IO source connector")
+ protected class DeleteSource extends SourceCommand {
@Override
void runCmd() throws Exception {
- admin.source().deleteSource(tenant, namespace, name);
+ admin.source().deleteSource(tenant, namespace, sourceName);
print("Delete source successfully");
}
}
+ @Parameters(commandDescription = "Gets the information about a Pulsar IO source connector")
+ protected class GetSource extends SourceCommand {
+
+ @Override
+ void runCmd() throws Exception {
+ SourceConfig sourceConfig = admin.source().getSource(tenant, namespace, sourceName);
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ System.out.println(gson.toJson(sourceConfig));
+ }
+ }
+
+ /**
+ * List Sources command
+ */
+ @Parameters(commandDescription = "List all running Pulsar IO source connectors")
+ protected class ListSources extends BaseCommand {
+ @Parameter(names = "--tenant", description = "The sink's tenant")
+ protected String tenant;
+
+ @Parameter(names = "--namespace", description = "The sink's namespace")
+ protected String namespace;
+
+ @Override
+ public void processArguments() {
+ if (tenant == null) {
+ tenant = PUBLIC_TENANT;
+ }
+ if (namespace == null) {
+ namespace = DEFAULT_NAMESPACE;
+ }
+ }
+
+ @Override
+ void runCmd() throws Exception {
+ List<String> sources = admin.source().listSources(tenant, namespace);
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ System.out.println(gson.toJson(sources));
+ }
+ }
+
@Parameters(commandDescription = "Get the list of Pulsar IO connector sources supported by Pulsar cluster")
public class ListBuiltInSources extends BaseCommand {
@Override
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index 0c56145..b52bc17 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -962,7 +962,7 @@ public class TestCmdSinks {
public void testDeleteMissingTenant() throws Exception {
deleteSink.tenant = null;
deleteSink.namespace = NAMESPACE;
- deleteSink.name = NAME;
+ deleteSink.sinkName = NAME;
deleteSink.processArguments();
@@ -975,7 +975,7 @@ public class TestCmdSinks {
public void testDeleteMissingNamespace() throws Exception {
deleteSink.tenant = TENANT;
deleteSink.namespace = null;
- deleteSink.name = NAME;
+ deleteSink.sinkName = NAME;
deleteSink.processArguments();
@@ -984,11 +984,11 @@ public class TestCmdSinks {
verify(sink).deleteSink(eq(TENANT), eq(DEFAULT_NAMESPACE), eq(NAME));
}
- @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "You must specify a name for the sink")
+ @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "You must specify a name for the sink")
public void testDeleteMissingName() throws Exception {
deleteSink.tenant = TENANT;
deleteSink.namespace = NAMESPACE;
- deleteSink.name = null;
+ deleteSink.sinkName = null;
deleteSink.processArguments();
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index fe799bc..4a3b3cc 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -820,7 +820,7 @@ public class TestCmdSources {
public void testDeleteMissingTenant() throws Exception {
deleteSource.tenant = null;
deleteSource.namespace = NAMESPACE;
- deleteSource.name = NAME;
+ deleteSource.sourceName = NAME;
deleteSource.processArguments();
@@ -833,7 +833,7 @@ public class TestCmdSources {
public void testDeleteMissingNamespace() throws Exception {
deleteSource.tenant = TENANT;
deleteSource.namespace = null;
- deleteSource.name = NAME;
+ deleteSource.sourceName = NAME;
deleteSource.processArguments();
@@ -842,11 +842,11 @@ public class TestCmdSources {
verify(source).deleteSource(eq(TENANT), eq(DEFAULT_NAMESPACE), eq(NAME));
}
- @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "You must specify a name for the source")
+ @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "You must specify a name for the source")
public void testDeleteMissingName() throws Exception {
deleteSource.tenant = TENANT;
deleteSource.namespace = NAMESPACE;
- deleteSource.name = null;
+ deleteSource.sourceName = null;
deleteSource.processArguments();
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index cf182a8..11b623d 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -20,15 +20,18 @@
package org.apache.pulsar.functions.utils;
import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
import static org.apache.commons.lang.StringUtils.isNotBlank;
import static org.apache.commons.lang.StringUtils.isNotEmpty;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
public class FunctionConfigUtils {
@@ -195,4 +198,83 @@ public class FunctionConfigUtils {
}
return functionDetailsBuilder.build();
}
+
+ public static FunctionConfig convertFromDetails(FunctionDetails functionDetails) {
+ FunctionConfig functionConfig = new FunctionConfig();
+ functionConfig.setTenant(functionDetails.getTenant());
+ functionConfig.setNamespace(functionDetails.getNamespace());
+ functionConfig.setName(functionDetails.getName());
+ functionConfig.setParallelism(functionDetails.getParallelism());
+ functionConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
+ Map<String, ConsumerConfig> consumerConfigMap = new HashMap<>();
+ for (Map.Entry<String, Function.ConsumerSpec> input : functionDetails.getSource().getInputSpecsMap().entrySet()) {
+ ConsumerConfig consumerConfig = new ConsumerConfig();
+ if (!isEmpty(input.getValue().getSerdeClassName())) {
+ consumerConfig.setSerdeClassName(input.getValue().getSerdeClassName());
+ }
+ if (!isEmpty(input.getValue().getSchemaType())) {
+ consumerConfig.setSchemaType(input.getValue().getSchemaType());
+ }
+ consumerConfig.setRegexPattern(input.getValue().getIsRegexPattern());
+ consumerConfigMap.put(input.getKey(), consumerConfig);
+ }
+ functionConfig.setInputSpecs(consumerConfigMap);
+ if (!isEmpty(functionDetails.getSource().getSubscriptionName())) {
+ functionConfig.setSubName(functionDetails.getSource().getSubscriptionName());
+ }
+ if (functionDetails.getSource().getSubscriptionType() == Function.SubscriptionType.FAILOVER) {
+ functionConfig.setRetainOrdering(true);
+ functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
+ } else {
+ functionConfig.setRetainOrdering(false);
+ functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+ }
+ functionConfig.setAutoAck(functionDetails.getAutoAck());
+ functionConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
+ if (!isEmpty(functionDetails.getSink().getTopic())) {
+ functionConfig.setOutput(functionDetails.getSink().getTopic());
+ }
+ if (!isEmpty(functionDetails.getSink().getSerDeClassName())) {
+ functionConfig.setOutputSerdeClassName(functionDetails.getSink().getSerDeClassName());
+ }
+ if (!isEmpty(functionDetails.getSink().getSchemaType())) {
+ functionConfig.setOutputSchemaType(functionDetails.getSink().getSchemaType());
+ }
+ if (!isEmpty(functionDetails.getLogTopic())) {
+ functionConfig.setLogTopic(functionDetails.getLogTopic());
+ }
+ functionConfig.setRuntime(Utils.convertRuntime(functionDetails.getRuntime()));
+ functionConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
+ if (functionDetails.hasRetryDetails()) {
+ functionConfig.setMaxMessageRetries(functionDetails.getRetryDetails().getMaxMessageRetries());
+ if (!isEmpty(functionDetails.getRetryDetails().getDeadLetterTopic())) {
+ functionConfig.setDeadLetterTopic(functionDetails.getRetryDetails().getDeadLetterTopic());
+ }
+ }
+ Map<String, Object> userConfig;
+ if (!isEmpty(functionDetails.getUserConfig())) {
+ Type type = new TypeToken<Map<String, Object>>() {}.getType();
+ userConfig = new Gson().fromJson(functionDetails.getUserConfig(), type);
+ } else {
+ userConfig = new HashMap<>();
+ }
+ if (userConfig.containsKey(WindowConfig.WINDOW_CONFIG_KEY)) {
+ WindowConfig windowConfig = (WindowConfig) userConfig.get(WindowConfig.WINDOW_CONFIG_KEY);
+ userConfig.remove(WindowConfig.WINDOW_CONFIG_KEY);
+ functionConfig.setClassName(windowConfig.getActualWindowFunctionClassName());
+ functionConfig.setWindowConfig(windowConfig);
+ } else {
+ functionConfig.setClassName(functionDetails.getClassName());
+ }
+ functionConfig.setUserConfig(userConfig);
+
+ if (functionDetails.hasResources()) {
+ Resources resources = new Resources();
+ resources.setCpu(functionDetails.getResources().getCpu());
+ resources.setRam(functionDetails.getResources().getRam());
+ resources.setDisk(functionDetails.getResources().getDisk());
+ }
+
+ return functionConfig;
+ }
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 95803ab..545d344 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -20,6 +20,7 @@
package org.apache.pulsar.functions.utils;
import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
@@ -29,9 +30,13 @@ import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Type;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
@@ -175,4 +180,56 @@ public class SinkConfigUtils {
}
return functionDetailsBuilder.build();
}
+
+ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
+ SinkConfig sinkConfig = new SinkConfig();
+ sinkConfig.setTenant(functionDetails.getTenant());
+ sinkConfig.setNamespace(functionDetails.getNamespace());
+ sinkConfig.setName(functionDetails.getName());
+ sinkConfig.setParallelism(functionDetails.getParallelism());
+ sinkConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
+ Map<String, ConsumerConfig> consumerConfigMap = new HashMap<>();
+ for (Map.Entry<String, Function.ConsumerSpec> input : functionDetails.getSource().getInputSpecsMap().entrySet()) {
+ ConsumerConfig consumerConfig = new ConsumerConfig();
+ if (!isEmpty(input.getValue().getSerdeClassName())) {
+ consumerConfig.setSerdeClassName(input.getValue().getSerdeClassName());
+ }
+ if (!isEmpty(input.getValue().getSchemaType())) {
+ consumerConfig.setSchemaType(input.getValue().getSchemaType());
+ }
+ consumerConfig.setRegexPattern(input.getValue().getIsRegexPattern());
+ consumerConfigMap.put(input.getKey(), consumerConfig);
+ }
+ sinkConfig.setInputSpecs(consumerConfigMap);
+ if (!isEmpty(functionDetails.getSource().getSubscriptionName())) {
+ sinkConfig.setSourceSubscriptionName(functionDetails.getSource().getSubscriptionName());
+ }
+ if (functionDetails.getSource().getSubscriptionType() == Function.SubscriptionType.FAILOVER) {
+ sinkConfig.setRetainOrdering(true);
+ sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
+ } else {
+ sinkConfig.setRetainOrdering(false);
+ sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+ }
+ sinkConfig.setAutoAck(functionDetails.getAutoAck());
+ sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
+ if (!isEmpty(functionDetails.getSink().getClassName())) {
+ sinkConfig.setClassName(functionDetails.getSink().getClassName());
+ }
+ if (!isEmpty(functionDetails.getSink().getBuiltin())) {
+ sinkConfig.setArchive("builtin://" + functionDetails.getSink().getBuiltin());
+ }
+ if (!org.apache.commons.lang3.StringUtils.isEmpty(functionDetails.getSink().getConfigs())) {
+ Type type = new TypeToken<Map<String, String>>() {}.getType();
+ sinkConfig.setConfigs(new Gson().fromJson(functionDetails.getSink().getConfigs(), type));
+ }
+ if (functionDetails.hasResources()) {
+ Resources resources = new Resources();
+ resources.setCpu(functionDetails.getResources().getCpu());
+ resources.setRam(functionDetails.getResources().getRam());
+ resources.setDisk(functionDetails.getResources().getDisk());
+ }
+
+ return sinkConfig;
+ }
}
\ No newline at end of file
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index a132c8a..3424062 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -20,6 +20,8 @@
package org.apache.pulsar.functions.utils;
import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
@@ -27,6 +29,8 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.Map;
import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
import static org.apache.pulsar.functions.utils.Utils.getSourceType;
@@ -41,10 +45,10 @@ public class SourceConfigUtils {
FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
- boolean isBuiltin = sourceConfig.getArchive().startsWith(Utils.BUILTIN);
+ boolean isBuiltin = !StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(Utils.BUILTIN);
if (!isBuiltin) {
- if (sourceConfig.getArchive().startsWith(Utils.FILE)) {
+ if (!StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(Utils.FILE)) {
if (org.apache.commons.lang3.StringUtils.isBlank(sourceConfig.getClassName())) {
throw new IllegalArgumentException("Class-name must be present for archive with file-url");
}
@@ -127,4 +131,40 @@ public class SourceConfigUtils {
return functionDetailsBuilder.build();
}
+
+ public static SourceConfig convertFromDetails(FunctionDetails functionDetails) {
+ SourceConfig sourceConfig = new SourceConfig();
+ sourceConfig.setTenant(functionDetails.getTenant());
+ sourceConfig.setNamespace(functionDetails.getNamespace());
+ sourceConfig.setName(functionDetails.getName());
+ sourceConfig.setParallelism(functionDetails.getParallelism());
+ sourceConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
+ Function.SourceSpec sourceSpec = functionDetails.getSource();
+ if (!StringUtils.isEmpty(sourceSpec.getClassName())) {
+ sourceConfig.setClassName(sourceSpec.getClassName());
+ }
+ if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
+ sourceConfig.setArchive("builtin://" + sourceSpec.getBuiltin());
+ }
+ if (!StringUtils.isEmpty(sourceSpec.getConfigs())) {
+ Type type = new TypeToken<Map<String, String>>() {}.getType();
+ sourceConfig.setConfigs(new Gson().fromJson(sourceSpec.getConfigs(), type));
+ }
+ Function.SinkSpec sinkSpec = functionDetails.getSink();
+ sourceConfig.setTopicName(sinkSpec.getTopic());
+ if (!StringUtils.isEmpty(sinkSpec.getSchemaType())) {
+ sourceConfig.setSchemaType(sinkSpec.getSchemaType());
+ }
+ if (!StringUtils.isEmpty(sinkSpec.getSerDeClassName())) {
+ sourceConfig.setSerdeClassName(sinkSpec.getSerDeClassName());
+ }
+ if (functionDetails.hasResources()) {
+ Resources resources = new Resources();
+ resources.setCpu(functionDetails.getResources().getCpu());
+ resources.setRam(functionDetails.getResources().getRam());
+ resources.setDisk(functionDetails.getResources().getDisk());
+ sourceConfig.setResources(resources);
+ }
+ return sourceConfig;
+ }
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index d35be61..adeaee1 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -151,6 +151,15 @@ public class Utils {
throw new RuntimeException("Unrecognized runtime: " + runtime.name());
}
+ public static FunctionConfig.Runtime convertRuntime(Runtime runtime) {
+ for (FunctionConfig.Runtime type : FunctionConfig.Runtime.values()) {
+ if (type.name().equals(runtime.name())) {
+ return type;
+ }
+ }
+ throw new RuntimeException("Unrecognized runtime: " + runtime.name());
+ }
+
public static org.apache.pulsar.functions.proto.Function.ProcessingGuarantees convertProcessingGuarantee(
FunctionConfig.ProcessingGuarantees processingGuarantees) {
for (org.apache.pulsar.functions.proto.Function.ProcessingGuarantees type : org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.values()) {
@@ -161,6 +170,17 @@ public class Utils {
throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name());
}
+ public static FunctionConfig.ProcessingGuarantees convertProcessingGuarantee(
+ org.apache.pulsar.functions.proto.Function.ProcessingGuarantees processingGuarantees) {
+ for (FunctionConfig.ProcessingGuarantees type : FunctionConfig.ProcessingGuarantees.values()) {
+ if (type.name().equals(processingGuarantees.name())) {
+ return type;
+ }
+ }
+ throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name());
+ }
+
+
public static Class<?> getSourceType(String className, ClassLoader classloader) {
Object userClass = Reflections.createInstance(className, classloader);
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
new file mode 100644
index 0000000..1f67798
--- /dev/null
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import com.google.gson.Gson;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.proto.Function;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Unit test of {@link Reflections}.
+ */
+public class FunctionConfigUtilsTest {
+
+ @Test
+ public void testConvertBackFidelity() {
+ FunctionConfig functionConfig = new FunctionConfig();
+ functionConfig.setTenant("test-tenant");
+ functionConfig.setNamespace("test-namespace");
+ functionConfig.setName("test-function");
+ functionConfig.setClassName(IdentityFunction.class.getName());
+ Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
+ inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build());
+ functionConfig.setInputSpecs(inputSpecs);
+ functionConfig.setOutput("test-output");
+ functionConfig.setOutputSerdeClassName("test-serde");
+ functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+ functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+ functionConfig.setRetainOrdering(false);
+ functionConfig.setUserConfig(new HashMap<>());
+ functionConfig.setAutoAck(true);
+ functionConfig.setTimeoutMs(2000l);
+ Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null);
+ FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
+ assertEquals(
+ new Gson().toJson(functionConfig),
+ new Gson().toJson(convertedConfig)
+ );
+ }
+}
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
new file mode 100644
index 0000000..c5d1ea0
--- /dev/null
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import com.google.gson.Gson;
+import org.apache.pulsar.functions.proto.Function;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Unit test of {@link Reflections}.
+ */
+public class SinkConfigUtilsTest {
+
+ @Test
+ public void testConvertBackFidelity() throws IOException {
+ SinkConfig sinkConfig = new SinkConfig();
+ sinkConfig.setTenant("test-tenant");
+ sinkConfig.setNamespace("test-namespace");
+ sinkConfig.setName("test-source");
+ sinkConfig.setArchive("builtin://jdbc");
+ sinkConfig.setSourceSubscriptionName("test-subscription");
+ Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
+ inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build());
+ sinkConfig.setInputSpecs(inputSpecs);
+ sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+ sinkConfig.setConfigs(new HashMap<>());
+ sinkConfig.setRetainOrdering(false);
+ sinkConfig.setAutoAck(true);
+ sinkConfig.setTimeoutMs(2000l);
+ Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, null);
+ SinkConfig convertedConfig = SinkConfigUtils.convertFromDetails(functionDetails);
+ assertEquals(
+ new Gson().toJson(sinkConfig),
+ new Gson().toJson(convertedConfig)
+ );
+ }
+}
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
new file mode 100644
index 0000000..ef4ce61
--- /dev/null
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import com.google.gson.Gson;
+import org.apache.pulsar.functions.proto.Function;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Unit test of {@link Reflections}.
+ */
+public class SourceConfigUtilsTest {
+
+ @Test
+ public void testConvertBackFidelity() throws IOException {
+ SourceConfig sourceConfig = new SourceConfig();
+ sourceConfig.setTenant("test-tenant");
+ sourceConfig.setNamespace("test-namespace");
+ sourceConfig.setName("test-source");
+ sourceConfig.setArchive("builtin://jdbc");
+ sourceConfig.setTopicName("test-output");
+ sourceConfig.setSerdeClassName("test-serde");
+ sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+ sourceConfig.setConfigs(new HashMap<>());
+ Function.FunctionDetails functionDetails = SourceConfigUtils.convert(sourceConfig, null);
+ SourceConfig convertedConfig = SourceConfigUtils.convertFromDetails(functionDetails);
+ assertEquals(
+ new Gson().toJson(sourceConfig),
+ new Gson().toJson(convertedConfig)
+ );
+ }
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 4faed11..920063e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -141,8 +141,8 @@ public class FunctionMetaDataManager implements AutoCloseable {
* @param namespace the namespace
* @return a list of function names
*/
- public synchronized Collection<String> listFunctions(String tenant, String namespace) {
- List<String> ret = new LinkedList<>();
+ public synchronized Collection<FunctionMetaData> listFunctions(String tenant, String namespace) {
+ List<FunctionMetaData> ret = new LinkedList<>();
if (!this.functionMetaDataMap.containsKey(tenant)) {
return ret;
@@ -152,7 +152,7 @@ public class FunctionMetaDataManager implements AutoCloseable {
return ret;
}
for (FunctionMetaData functionMetaData : this.functionMetaDataMap.get(tenant).get(namespace).values()) {
- ret.add(functionMetaData.getFunctionDetails().getName());
+ ret.add(functionMetaData);
}
return ret;
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 0b245cf..44bb3bd 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -84,6 +84,7 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.utils.*;
import org.apache.pulsar.functions.utils.validation.ConfigValidation;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
@@ -97,9 +98,15 @@ import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import net.jodah.typetools.TypeResolver;
+// TODO:-Currently The source/sink/functions all share this backend. In the future it might make sense
+// to seperate them out in their own implementations as well.
@Slf4j
public class FunctionsImpl {
+ public static final String FUNCTION = "Function";
+ public static final String SOURCE = "Source";
+ public static final String SINK = "Sink";
+
private final Supplier<WorkerService> workerServiceSupplier;
public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
@@ -126,11 +133,10 @@ public class FunctionsImpl {
return true;
}
- public Response registerFunction(final String tenant, final String namespace, final String functionName,
+ public Response registerFunction(final String tenant, final String namespace, final String componentName,
final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail,
- final String functionPkgUrl, final String functionDetailsJson, final String functionConfigJson,
- final String sourceConfigJson, final String sinkConfigJson,
- final String clientRole) {
+ final String functionPkgUrl, final String functionDetailsJson, final String componentConfigJson,
+ final String componentType, final String clientRole) {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
@@ -138,23 +144,23 @@ public class FunctionsImpl {
try {
if (!isAuthorizedRole(tenant, clientRole)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to register function", tenant, namespace,
- functionName, clientRole);
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", tenant, namespace,
+ componentName, clientRole, componentType);
return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData("client is not authorize to perform operation")).build();
}
} catch (PulsarAdminException e) {
- log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e);
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage())).build();
}
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
- if (functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
- log.error("Function {}/{}/{} already exists", tenant, namespace, functionName);
+ if (functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+ log.error("{} {}/{}/{} already exists", componentType, tenant, namespace, componentName);
return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(String.format("Function %s already exists", functionName))).build();
+ .entity(new ErrorData(String.format("%s %s already exists", componentType, componentName))).build();
}
FunctionDetails functionDetails;
@@ -166,14 +172,14 @@ public class FunctionsImpl {
// validate parameters
try {
if (isPkgUrlProvided) {
- functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, functionName, functionPkgUrl,
- functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson);
+ functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, componentName, functionPkgUrl,
+ functionDetailsJson, componentConfigJson, componentType);
} else {
- functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStreamAsFile,
- fileDetail, functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson);
+ functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, uploadedInputStreamAsFile,
+ fileDetail, functionDetailsJson, componentConfigJson, componentType);
}
} catch (Exception e) {
- log.error("Invalid register function request @ /{}/{}/{}", tenant, namespace, functionName, e);
+ log.error("Invalid register {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage())).build();
}
@@ -181,9 +187,9 @@ public class FunctionsImpl {
try {
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
} catch (Exception e) {
- log.error("Function {}/{}/{} cannot be admitted by the runtime factory", tenant, namespace, functionName);
+ log.error("{} {}/{}/{} cannot be admitted by the runtime factory", componentType, tenant, namespace, componentName);
return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(String.format("Function %s cannot be admitted:- %s", functionName, e.getMessage()))).build();
+ .entity(new ErrorData(String.format("%s %s cannot be admitted:- %s", componentType, componentName, e.getMessage()))).build();
}
// function state
@@ -196,7 +202,7 @@ public class FunctionsImpl {
packageLocationMetaDataBuilder.setPackagePath("builtin://" + getFunctionCodeBuiltin(functionDetails));
} else {
packageLocationMetaDataBuilder.setPackagePath(isPkgUrlProvided ? functionPkgUrl
- : createPackagePath(tenant, namespace, functionName, fileDetail.getFileName()));
+ : createPackagePath(tenant, namespace, componentName, fileDetail.getFileName()));
if (!isPkgUrlProvided) {
packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
}
@@ -207,10 +213,10 @@ public class FunctionsImpl {
: updateRequest(functionMetaDataBuilder.build(), uploadedInputStreamAsFile);
}
- public Response updateFunction(final String tenant, final String namespace, final String functionName,
+ public Response updateFunction(final String tenant, final String namespace, final String componentName,
final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail,
- final String functionPkgUrl, final String functionDetailsJson, final String functionConfigJson,
- final String sourceConfigJson, final String sinkConfigJson, final String clientRole) {
+ final String functionPkgUrl, final String functionDetailsJson, final String componentConfigJson,
+ final String componentType, final String clientRole) {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
@@ -218,22 +224,22 @@ public class FunctionsImpl {
try {
if (!isAuthorizedRole(tenant, clientRole)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to update function", tenant, namespace,
- functionName, clientRole);
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to update {}", tenant, namespace,
+ componentName, clientRole, componentType);
return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData("client is not authorize to perform operation")).build();
}
} catch (PulsarAdminException e) {
- log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e);
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage())).build();
}
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
- if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
+ if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+ .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
}
FunctionDetails functionDetails;
@@ -245,14 +251,14 @@ public class FunctionsImpl {
// validate parameters
try {
if (isPkgUrlProvided) {
- functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, functionName, functionPkgUrl,
- functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson);
+ functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, componentName, functionPkgUrl,
+ functionDetailsJson, componentConfigJson, componentType);
} else {
- functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStreamAsFile,
- fileDetail, functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson);
+ functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, uploadedInputStreamAsFile,
+ fileDetail, functionDetailsJson, componentConfigJson, componentType);
}
} catch (Exception e) {
- log.error("Invalid register function request @ /{}/{}/{}", tenant, namespace, functionName, e);
+ log.error("Invalid register {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage())).build();
}
@@ -260,9 +266,9 @@ public class FunctionsImpl {
try {
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
} catch (Exception e) {
- log.error("Updated Function {}/{}/{} cannot be submitted to runtime factory", tenant, namespace, functionName);
+ log.error("Updated {} {}/{}/{} cannot be submitted to runtime factory", componentType, tenant, namespace, componentName);
return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(String.format("Function %s cannot be admitted:- %s", functionName, e.getMessage()))).build();
+ .entity(new ErrorData(String.format("%s %s cannot be admitted:- %s", componentType, componentName, e.getMessage()))).build();
}
// function state
@@ -276,7 +282,7 @@ public class FunctionsImpl {
packageLocationMetaDataBuilder.setPackagePath("builtin://" + getFunctionCodeBuiltin(functionDetails));
} else {
packageLocationMetaDataBuilder.setPackagePath(isPkgUrlProvided ? functionPkgUrl
- : createPackagePath(tenant, namespace, functionName, fileDetail.getFileName()));
+ : createPackagePath(tenant, namespace, componentName, fileDetail.getFileName()));
if (!isPkgUrlProvided) {
packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
}
@@ -287,8 +293,8 @@ public class FunctionsImpl {
: updateRequest(functionMetaDataBuilder.build(), uploadedInputStreamAsFile);
}
- public Response deregisterFunction(final String tenant, final String namespace, final String functionName,
- String clientRole) {
+ public Response deregisterFunction(final String tenant, final String namespace, final String componentName,
+ String componentType, String clientRole) {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
@@ -296,35 +302,41 @@ public class FunctionsImpl {
try {
if (!isAuthorizedRole(tenant, clientRole)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to deregister function", tenant, namespace,
- functionName, clientRole);
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to deregister {}", tenant, namespace,
+ componentName, clientRole, componentType);
return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData("client is not authorize to perform operation")).build();
}
} catch (PulsarAdminException e) {
- log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e);
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage())).build();
}
// validate parameters
try {
- validateDeregisterRequestParams(tenant, namespace, functionName);
+ validateDeregisterRequestParams(tenant, namespace, componentName, componentType);
} catch (IllegalArgumentException e) {
- log.error("Invalid deregister function request @ /{}/{}/{}", tenant, namespace, functionName, e);
+ log.error("Invalid deregister {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage())).build();
}
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
- if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
- log.error("Function to deregister does not exist @ /{}/{}/{}", tenant, namespace, functionName);
+ if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+ log.error("{} to deregister does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+ .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
+ }
+ FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+ if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+ log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
+ return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
}
CompletableFuture<RequestResult> completableFuture = functionMetaDataManager.deregisterFunction(tenant,
- namespace, functionName);
+ namespace, componentName);
RequestResult requestResult = null;
try {
@@ -334,12 +346,12 @@ public class FunctionsImpl {
.entity(new ErrorData(requestResult.getMessage())).build();
}
} catch (ExecutionException e) {
- log.error("Execution Exception while deregistering function @ /{}/{}/{}", tenant, namespace, functionName,
+ log.error("Execution Exception while deregistering {} @ /{}/{}/{}", componentType, tenant, namespace, componentName,
e);
return Response.serverError().type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getCause().getMessage())).build();
} catch (InterruptedException e) {
- log.error("Interrupted Exception while deregistering function @ /{}/{}/{}", tenant, namespace, functionName,
+ log.error("Interrupted Exception while deregistering {} @ /{}/{}/{}", componentType, tenant, namespace, componentName,
e);
return Response.status(Status.REQUEST_TIMEOUT).type(MediaType.APPLICATION_JSON).build();
}
@@ -347,7 +359,8 @@ public class FunctionsImpl {
return Response.status(Status.OK).entity(requestResult.toJson()).build();
}
- public Response getFunctionInfo(final String tenant, final String namespace, final String functionName)
+ public Response getFunctionInfo(final String tenant, final String namespace, final String componentName,
+ final String componentType)
throws IOException {
if (!isWorkerServiceAvailable()) {
@@ -356,25 +369,38 @@ public class FunctionsImpl {
// validate parameters
try {
- validateGetFunctionRequestParams(tenant, namespace, functionName);
+ validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
} catch (IllegalArgumentException e) {
- log.error("Invalid getFunction request @ /{}/{}/{}", tenant, namespace, functionName, e);
+ log.error("Invalid get {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage())).build();
}
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
- if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
- log.error("Function in getFunction does not exist @ /{}/{}/{}", tenant, namespace, functionName);
+ if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+ log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+ .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
+ }
+ FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+ if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+ log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
+ return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
}
- FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace,
- functionName);
- String functionDetailsJson = org.apache.pulsar.functions.utils.Utils
- .printJson(functionMetaData.getFunctionDetails());
- return Response.status(Status.OK).entity(functionDetailsJson).build();
+ String retval;
+ if (componentType.equals(FUNCTION)) {
+ FunctionConfig config = FunctionConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
+ retval = new Gson().toJson(config);
+ } else if (componentType.equals(SOURCE)) {
+ SourceConfig config = SourceConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
+ retval = new Gson().toJson(config);
+ } else {
+ SinkConfig config = SinkConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
+ retval = new Gson().toJson(config);
+ }
+ return Response.status(Status.OK).entity(retval).build();
}
public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String functionName,
@@ -485,7 +511,7 @@ public class FunctionsImpl {
// validate parameters
try {
- validateGetFunctionRequestParams(tenant, namespace, functionName);
+ validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION);
} catch (IllegalArgumentException e) {
log.error("Invalid restart-Function request @ /{}/{}/{}", tenant, namespace, functionName, e);
return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
@@ -519,7 +545,7 @@ public class FunctionsImpl {
// validate parameters
try {
- validateGetFunctionRequestParams(tenant, namespace, functionName);
+ validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION);
} catch (IllegalArgumentException e) {
log.error("Invalid getFunctionStatus request @ /{}/{}/{}", tenant, namespace, functionName, e);
return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
@@ -552,7 +578,7 @@ public class FunctionsImpl {
return Response.status(Status.OK).entity(jsonResponse).build();
}
- public Response listFunctions(final String tenant, final String namespace) {
+ public Response listFunctions(final String tenant, final String namespace, String componentType) {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
@@ -562,16 +588,22 @@ public class FunctionsImpl {
try {
validateListFunctionRequestParams(tenant, namespace);
} catch (IllegalArgumentException e) {
- log.error("Invalid listFunctions request @ /{}/{}", tenant, namespace, e);
+ log.error("Invalid list {} request @ /{}/{}", componentType, tenant, namespace, e);
return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage())).build();
}
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
- Collection<String> functionStateList = functionMetaDataManager.listFunctions(tenant, namespace);
+ Collection<FunctionMetaData> functionStateList = functionMetaDataManager.listFunctions(tenant, namespace);
+ List<String> retval = new LinkedList<>();
+ for (FunctionMetaData functionMetaData : functionStateList) {
+ if (calculateSubjectType(functionMetaData).equals(componentType)) {
+ retval.add(functionMetaData.getFunctionDetails().getName());
+ }
+ }
- return Response.status(Status.OK).entity(new Gson().toJson(functionStateList.toArray())).build();
+ return Response.status(Status.OK).entity(new Gson().toJson(retval.toArray())).build();
}
private Response updateRequest(FunctionMetaData functionMetaData, File uploadedInputStreamAsFile) {
@@ -840,13 +872,13 @@ public class FunctionsImpl {
private void validateGetFunctionInstanceRequestParams(String tenant, String namespace, String functionName,
String instanceId) throws IllegalArgumentException {
- validateGetFunctionRequestParams(tenant, namespace, functionName);
+ validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION);
if (instanceId == null) {
throw new IllegalArgumentException("Function Instance Id is not provided");
}
}
- private void validateGetFunctionRequestParams(String tenant, String namespace, String functionName)
+ private void validateGetFunctionRequestParams(String tenant, String namespace, String subject, String subjectType)
throws IllegalArgumentException {
if (tenant == null) {
@@ -855,12 +887,12 @@ public class FunctionsImpl {
if (namespace == null) {
throw new IllegalArgumentException("Namespace is not provided");
}
- if (functionName == null) {
- throw new IllegalArgumentException("Function Name is not provided");
+ if (subject == null) {
+ throw new IllegalArgumentException(subjectType + " Name is not provided");
}
}
- private void validateDeregisterRequestParams(String tenant, String namespace, String functionName)
+ private void validateDeregisterRequestParams(String tenant, String namespace, String subject, String subjectType)
throws IllegalArgumentException {
if (tenant == null) {
@@ -869,30 +901,30 @@ public class FunctionsImpl {
if (namespace == null) {
throw new IllegalArgumentException("Namespace is not provided");
}
- if (functionName == null) {
- throw new IllegalArgumentException("Function Name is not provided");
+ if (subject == null) {
+ throw new IllegalArgumentException(subjectType + " Name is not provided");
}
}
- private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, String namespace, String functionName,
- String functionPkgUrl, String functionDetailsJson, String functionConfigJson,
- String sourceConfigJson, String sinkConfigJson)
+ private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, String namespace, String componentName,
+ String functionPkgUrl, String functionDetailsJson, String componentConfigJson,
+ String componentType)
throws IllegalArgumentException, IOException, URISyntaxException {
if (!isFunctionPackageUrlSupported(functionPkgUrl)) {
throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)");
}
- FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
- functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson, functionPkgUrl, null);
+ FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
+ functionDetailsJson, componentConfigJson, componentType, functionPkgUrl, null);
return functionDetails;
}
- private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName,
+ private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String componentName,
File uploadedInputStreamAsFile, FormDataContentDisposition fileDetail, String functionDetailsJson,
- String functionConfigJson, String sourceConfigJson, String sinkConfigJson)
+ String componentConfigJson, String componentType)
throws IllegalArgumentException, IOException, URISyntaxException {
- FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
- functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson, null, uploadedInputStreamAsFile);
+ FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
+ functionDetailsJson, componentConfigJson, componentType,null, uploadedInputStreamAsFile);
if (!isFunctionCodeBuiltin(functionDetails) && (uploadedInputStreamAsFile == null || fileDetail == null)) {
throw new IllegalArgumentException("Function Package is not provided");
}
@@ -964,40 +996,21 @@ public class FunctionsImpl {
return null;
}
- private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName,
- String functionDetailsJson, String functionConfigJson, String sourceConfigJson,
- String sinkConfigJson, String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException, URISyntaxException {
+ private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String componentName,
+ String functionDetailsJson, String componentConfigJson, String componentType,
+ String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException, URISyntaxException {
if (tenant == null) {
throw new IllegalArgumentException("Tenant is not provided");
}
if (namespace == null) {
throw new IllegalArgumentException("Namespace is not provided");
}
- if (functionName == null) {
- throw new IllegalArgumentException("Function Name is not provided");
+ if (componentName == null) {
+ throw new IllegalArgumentException(String.format("%s Name is not provided", componentType));
}
- int numDefinitions = 0;
- if (!StringUtils.isEmpty(functionDetailsJson)) {
- numDefinitions++;
- }
- if (!StringUtils.isEmpty(functionConfigJson)) {
- numDefinitions++;
- }
- if (!StringUtils.isEmpty(sourceConfigJson)) {
- numDefinitions++;
- }
- if (!StringUtils.isEmpty(sinkConfigJson)) {
- numDefinitions++;
- }
- if (numDefinitions == 0) {
- throw new IllegalArgumentException("Function Info is not provided");
- }
- if (numDefinitions > 1) {
- throw new IllegalArgumentException("Conflicting Info provided");
- }
- if (!StringUtils.isEmpty(functionConfigJson)) {
- FunctionConfig functionConfig = new Gson().fromJson(functionConfigJson, FunctionConfig.class);
+ if (componentType.equals(FUNCTION) && !isEmpty(componentConfigJson)) {
+ FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class);
ClassLoader clsLoader = null;
if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
clsLoader = extractClassLoader(functionPkgUrl, uploadedInputStreamAsFile);
@@ -1008,14 +1021,14 @@ public class FunctionsImpl {
ConfigValidation.validateConfig(functionConfig, functionConfig.getRuntime().name(), clsLoader);
return FunctionConfigUtils.convert(functionConfig, clsLoader);
}
- if (!StringUtils.isEmpty(sourceConfigJson)) {
- SourceConfig sourceConfig = new Gson().fromJson(sourceConfigJson, SourceConfig.class);
+ if (componentType.equals(SOURCE)) {
+ SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class);
NarClassLoader clsLoader = extractNarClassLoader(sourceConfig.getArchive(), functionPkgUrl, uploadedInputStreamAsFile, true);
ConfigValidation.validateConfig(sourceConfig, FunctionConfig.Runtime.JAVA.name(), clsLoader);
return SourceConfigUtils.convert(sourceConfig, clsLoader);
}
- if (!StringUtils.isEmpty(sinkConfigJson)) {
- SinkConfig sinkConfig = new Gson().fromJson(sinkConfigJson, SinkConfig.class);
+ if (componentType.equals(SINK)) {
+ SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class);
NarClassLoader clsLoader = extractNarClassLoader(sinkConfig.getArchive(), functionPkgUrl, uploadedInputStreamAsFile, false);
ConfigValidation.validateConfig(sinkConfig, FunctionConfig.Runtime.JAVA.name(), clsLoader);
return SinkConfigUtils.convert(sinkConfig, clsLoader);
@@ -1260,4 +1273,23 @@ public class FunctionsImpl {
return clientRole != null && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
}
+ public String calculateSubjectType(FunctionMetaData functionMetaData) {
+ SourceSpec sourceSpec = functionMetaData.getFunctionDetails().getSource();
+ SinkSpec sinkSpec = functionMetaData.getFunctionDetails().getSink();
+ if (sourceSpec.getInputSpecsCount() == 0) {
+ return SOURCE;
+ }
+ // Now its between sink and function
+
+ if (!isEmpty(sinkSpec.getBuiltin())) {
+ // if its built in, its a sink
+ return SINK;
+ }
+
+ if (isEmpty(sinkSpec.getClassName()) || sinkSpec.getClassName().equals(PulsarSink.class.getName())) {
+ return FUNCTION;
+ }
+ return SINK;
+ }
+
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index d6e1439..405f88f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.worker.rest.api.v2;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;
import org.apache.pulsar.common.io.ConnectorDefinition;
@@ -60,7 +61,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
final @FormDataParam("functionConfig") String functionConfigJson) {
return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
- functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());
+ functionPkgUrl, functionDetailsJson, functionConfigJson, FunctionsImpl.FUNCTION, clientAppId());
}
@@ -77,7 +78,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
final @FormDataParam("functionConfig") String functionConfigJson) {
return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
- functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());
+ functionPkgUrl, functionDetailsJson, functionConfigJson, FunctionsImpl.FUNCTION, clientAppId());
}
@@ -86,7 +87,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
@Path("/{tenant}/{namespace}/{functionName}")
public Response deregisterFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
- return functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
+ return functions.deregisterFunction(tenant, namespace, functionName, FunctionsImpl.FUNCTION, clientAppId());
}
@GET
@@ -96,7 +97,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
final @PathParam("functionName") String functionName)
throws IOException {
return functions.getFunctionInfo(
- tenant, namespace, functionName);
+ tenant, namespace, functionName, FunctionsImpl.FUNCTION);
}
@GET
@@ -123,7 +124,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
public Response listFunctions(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace) {
return functions.listFunctions(
- tenant, namespace);
+ tenant, namespace, FunctionsImpl.FUNCTION);
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
index 151c2c1..488f47d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
@@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;
@@ -52,7 +53,7 @@ public class SinkApiV2Resource extends FunctionApiResource {
final @FormDataParam("sinkConfig") String sinkConfigJson) {
return functions.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
- functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
+ functionPkgUrl, null, sinkConfigJson, FunctionsImpl.SINK, clientAppId());
}
@@ -68,7 +69,7 @@ public class SinkApiV2Resource extends FunctionApiResource {
final @FormDataParam("sinkConfig") String sinkConfigJson) {
return functions.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
- functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
+ functionPkgUrl, null, sinkConfigJson, FunctionsImpl.SINK, clientAppId());
}
@@ -77,7 +78,7 @@ public class SinkApiV2Resource extends FunctionApiResource {
@Path("/{tenant}/{namespace}/{sinkName}")
public Response deregisterSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
- return functions.deregisterFunction(tenant, namespace, sinkName, clientAppId());
+ return functions.deregisterFunction(tenant, namespace, sinkName, FunctionsImpl.SINK, clientAppId());
}
@GET
@@ -86,7 +87,7 @@ public class SinkApiV2Resource extends FunctionApiResource {
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName)
throws IOException {
- return functions.getFunctionInfo(tenant, namespace, sinkName);
+ return functions.getFunctionInfo(tenant, namespace, sinkName, FunctionsImpl.SINK);
}
@GET
@@ -111,7 +112,7 @@ public class SinkApiV2Resource extends FunctionApiResource {
@Path("/{tenant}/{namespace}")
public Response listSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace) {
- return functions.listFunctions(tenant, namespace);
+ return functions.listFunctions(tenant, namespace, FunctionsImpl.SINK);
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
index 44fac19..3b1222e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
@@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;
@@ -52,7 +53,7 @@ public class SourceApiV2Resource extends FunctionApiResource {
final @FormDataParam("sourceConfig") String sourceConfigJson) {
return functions.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
- functionPkgUrl, null, null, sourceConfigJson, null, clientAppId());
+ functionPkgUrl, null, sourceConfigJson, FunctionsImpl.SOURCE, clientAppId());
}
@@ -68,7 +69,7 @@ public class SourceApiV2Resource extends FunctionApiResource {
final @FormDataParam("sourceConfig") String sourceConfigJson) {
return functions.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
- functionPkgUrl, null, null, sourceConfigJson, null, clientAppId());
+ functionPkgUrl, null, sourceConfigJson, FunctionsImpl.SOURCE, clientAppId());
}
@@ -77,7 +78,7 @@ public class SourceApiV2Resource extends FunctionApiResource {
@Path("/{tenant}/{namespace}/{sourceName}")
public Response deregisterSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
- return functions.deregisterFunction(tenant, namespace, sourceName, clientAppId());
+ return functions.deregisterFunction(tenant, namespace, sourceName, FunctionsImpl.SOURCE, clientAppId());
}
@GET
@@ -86,7 +87,7 @@ public class SourceApiV2Resource extends FunctionApiResource {
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName)
throws IOException {
- return functions.getFunctionInfo(tenant, namespace, sourceName);
+ return functions.getFunctionInfo(tenant, namespace, sourceName, FunctionsImpl.SOURCE);
}
@GET
@@ -111,7 +112,7 @@ public class SourceApiV2Resource extends FunctionApiResource {
@Path("/{tenant}/{namespace}")
public Response listSources(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace) {
- return functions.listFunctions(tenant, namespace);
+ return functions.listFunctions(tenant, namespace, FunctionsImpl.SOURCE);
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index ac99f9a..7fc7c7f 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -66,15 +66,16 @@ public class FunctionMetaDataManagerTest {
mockPulsarClient()));
Map<String, Function.FunctionMetaData> functionMetaDataMap1 = new HashMap<>();
- functionMetaDataMap1.put("func-1", Function.FunctionMetaData.newBuilder().setFunctionDetails(
- Function.FunctionDetails.newBuilder().setName("func-1")).build());
- functionMetaDataMap1.put("func-2",
- Function.FunctionMetaData.newBuilder().setFunctionDetails(
- Function.FunctionDetails.newBuilder().setName("func-2")).build());
+ Function.FunctionMetaData f1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+ Function.FunctionDetails.newBuilder().setName("func-1")).build();
+ functionMetaDataMap1.put("func-1", f1);
+ Function.FunctionMetaData f2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+ Function.FunctionDetails.newBuilder().setName("func-2")).build();
+ functionMetaDataMap1.put("func-2", f2);
+ Function.FunctionMetaData f3 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+ Function.FunctionDetails.newBuilder().setName("func-3")).build();
Map<String, Function.FunctionMetaData> functionMetaDataInfoMap2 = new HashMap<>();
- functionMetaDataInfoMap2.put("func-3",
- Function.FunctionMetaData.newBuilder().setFunctionDetails(
- Function.FunctionDetails.newBuilder().setName("func-3")).build());
+ functionMetaDataInfoMap2.put("func-3", f3);
functionMetaDataManager.functionMetaDataMap.put("tenant-1", new HashMap<>());
@@ -86,13 +87,13 @@ public class FunctionMetaDataManagerTest {
Assert.assertEquals(2, functionMetaDataManager.listFunctions(
"tenant-1", "namespace-1").size());
Assert.assertTrue(functionMetaDataManager.listFunctions(
- "tenant-1", "namespace-1").contains("func-1"));
+ "tenant-1", "namespace-1").contains(f1));
Assert.assertTrue(functionMetaDataManager.listFunctions(
- "tenant-1", "namespace-1").contains("func-2"));
+ "tenant-1", "namespace-1").contains(f2));
Assert.assertEquals(1, functionMetaDataManager.listFunctions(
"tenant-1", "namespace-2").size());
Assert.assertTrue(functionMetaDataManager.listFunctions(
- "tenant-1", "namespace-2").contains("func-3"));
+ "tenant-1", "namespace-2").contains(f3));
}
@Test
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index d514fab..460d050 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -21,10 +21,9 @@ package org.apache.pulsar.functions.worker.rest.api.v2;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
import static org.powermock.api.mockito.PowerMockito.doNothing;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
import static org.powermock.api.mockito.PowerMockito.doThrow;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.testng.Assert.assertEquals;
@@ -62,6 +61,7 @@ import org.apache.pulsar.functions.proto.Function.SubscriptionType;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.FunctionConfig;
+import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.worker.*;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
@@ -145,6 +145,7 @@ public class FunctionApiV2ResourceTest {
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
+ doReturn("Function").when(this.resource).calculateSubjectType(any());
}
//
@@ -311,8 +312,7 @@ public class FunctionApiV2ResourceTest {
null,
null,
new Gson().toJson(functionConfig),
- null,
- null,
+ FunctionsImpl.FUNCTION,
null);
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -339,8 +339,7 @@ public class FunctionApiV2ResourceTest {
null,
null,
new Gson().toJson(functionConfig),
- null,
- null,
+ FunctionsImpl.FUNCTION,
null);
}
@@ -600,8 +599,7 @@ public class FunctionApiV2ResourceTest {
null,
null,
new Gson().toJson(functionConfig),
- null,
- null,
+ FunctionsImpl.FUNCTION,
null);
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -629,8 +627,7 @@ public class FunctionApiV2ResourceTest {
null,
null,
new Gson().toJson(functionConfig),
- null,
- null,
+ FunctionsImpl.FUNCTION,
null);
}
@@ -714,8 +711,7 @@ public class FunctionApiV2ResourceTest {
filePackageUrl,
null,
new Gson().toJson(functionConfig),
- null,
- null,
+ FunctionsImpl.FUNCTION,
null);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
@@ -804,7 +800,8 @@ public class FunctionApiV2ResourceTest {
tenant,
namespace,
function,
- null);
+ FunctionsImpl.FUNCTION,
+ null);
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
@@ -815,7 +812,8 @@ public class FunctionApiV2ResourceTest {
tenant,
namespace,
function,
- null);
+ FunctionsImpl.FUNCTION,
+ null);
}
@Test
@@ -910,7 +908,8 @@ public class FunctionApiV2ResourceTest {
Response response = resource.getFunctionInfo(
tenant,
namespace,
- function);
+ function,
+ FunctionsImpl.FUNCTION);
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
@@ -920,7 +919,8 @@ public class FunctionApiV2ResourceTest {
return resource.getFunctionInfo(
tenant,
namespace,
- function);
+ function,
+ FunctionsImpl.FUNCTION);
}
@Test
@@ -960,8 +960,8 @@ public class FunctionApiV2ResourceTest {
Response response = getDefaultFunctionInfo();
assertEquals(Status.OK.getStatusCode(), response.getStatus());
assertEquals(
- org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
- response.getEntity());
+ new Gson().toJson(FunctionConfigUtils.convertFromDetails(functionDetails)),
+ response.getEntity());
}
//
@@ -991,7 +991,8 @@ public class FunctionApiV2ResourceTest {
) {
Response response = resource.listFunctions(
tenant,
- namespace);
+ namespace,
+ FunctionsImpl.FUNCTION);
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
@@ -1000,13 +1001,46 @@ public class FunctionApiV2ResourceTest {
private Response listDefaultFunctions() {
return resource.listFunctions(
tenant,
- namespace);
+ namespace,
+ FunctionsImpl.FUNCTION);
}
@Test
public void testListFunctionsSuccess() throws Exception {
List<String> functions = Lists.newArrayList("test-1", "test-2");
- when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functions);
+ List<FunctionMetaData> metaDataList = new LinkedList<>();
+ FunctionMetaData functionMetaData1 = FunctionMetaData.newBuilder().setFunctionDetails(
+ FunctionDetails.newBuilder().setName("test-1").build()
+ ).build();
+ FunctionMetaData functionMetaData2 = FunctionMetaData.newBuilder().setFunctionDetails(
+ FunctionDetails.newBuilder().setName("test-2").build()
+ ).build();
+ metaDataList.add(functionMetaData1);
+ metaDataList.add(functionMetaData2);
+ when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(metaDataList);
+
+ Response response = listDefaultFunctions();
+ assertEquals(Status.OK.getStatusCode(), response.getStatus());
+ assertEquals(new Gson().toJson(functions), response.getEntity());
+ }
+
+ @Test
+ public void testOnlyGetSources() throws Exception {
+ List<String> functions = Lists.newArrayList("test-2");
+ List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
+ FunctionMetaData f1 = FunctionMetaData.newBuilder().setFunctionDetails(
+ FunctionDetails.newBuilder().setName("test-1").build()).build();
+ functionMetaDataList.add(f1);
+ FunctionMetaData f2 = FunctionMetaData.newBuilder().setFunctionDetails(
+ FunctionDetails.newBuilder().setName("test-2").build()).build();
+ functionMetaDataList.add(f2);
+ FunctionMetaData f3 = FunctionMetaData.newBuilder().setFunctionDetails(
+ FunctionDetails.newBuilder().setName("test-3").build()).build();
+ functionMetaDataList.add(f3);
+ when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
+ doReturn("Source").when(this.resource).calculateSubjectType(f1);
+ doReturn("Function").when(this.resource).calculateSubjectType(f2);
+ doReturn("Sink").when(this.resource).calculateSubjectType(f3);
Response response = listDefaultFunctions();
assertEquals(Status.OK.getStatusCode(), response.getStatus());
@@ -1068,7 +1102,7 @@ public class FunctionApiV2ResourceTest {
functionConfig.setOutput(outputTopic);
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
Response response = resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl,
- null, new Gson().toJson(functionConfig), null, null, null);
+ null, new Gson().toJson(functionConfig), FunctionsImpl.FUNCTION, null);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
index 4e52c95..315f56d 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.worker.rest.api.v2;
+import com.google.common.collect.Lists;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.api.namespace.Namespace;
@@ -26,6 +27,8 @@ import org.apache.logging.log4j.core.config.Configurator;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
@@ -38,6 +41,7 @@ import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.Assert;
@@ -51,6 +55,8 @@ import javax.ws.rs.core.Response.Status;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -143,6 +149,7 @@ public class SinkApiV2ResourceTest {
doReturn(null).when(resource).extractNarClassLoader(anyString(), anyString(), anyObject(), anyBoolean());
mockStatic(SinkConfigUtils.class);
when(SinkConfigUtils.convert(anyObject(), anyObject())).thenReturn(FunctionDetails.newBuilder().build());
+ Mockito.doReturn("Sink").when(this.resource).calculateSubjectType(any());
}
//
@@ -188,7 +195,7 @@ public class SinkApiV2ResourceTest {
topicsToSerDeClassName,
className,
parallelism,
- "Function Name is not provided");
+ "Sink Name is not provided");
}
@Test
@@ -257,9 +264,8 @@ public class SinkApiV2ResourceTest {
details,
null,
null,
- null,
- null,
new Gson().toJson(sinkConfig),
+ FunctionsImpl.SINK,
null);
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -282,9 +288,8 @@ public class SinkApiV2ResourceTest {
mockedFormData,
null,
null,
- null,
- null,
new Gson().toJson(sinkConfig),
+ FunctionsImpl.SINK,
null);
}
@@ -296,7 +301,7 @@ public class SinkApiV2ResourceTest {
Response response = registerDefaultSink();
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
- assertEquals(new ErrorData("Function " + sink + " already exists").reason, ((ErrorData) response.getEntity()).reason);
+ assertEquals(new ErrorData("Sink " + sink + " already exists").reason, ((ErrorData) response.getEntity()).reason);
}
@Test
@@ -421,7 +426,7 @@ public class SinkApiV2ResourceTest {
topicsToSerDeClassName,
className,
parallelism,
- "Function Name is not provided");
+ "Sink Name is not provided");
}
@Test
@@ -492,9 +497,8 @@ public class SinkApiV2ResourceTest {
details,
null,
null,
- null,
- null,
new Gson().toJson(sinkConfig),
+ FunctionsImpl.SINK,
null);
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -518,9 +522,8 @@ public class SinkApiV2ResourceTest {
mockedFormData,
null,
null,
- null,
- null,
new Gson().toJson(sinkConfig),
+ FunctionsImpl.SINK,
null);
}
@@ -530,7 +533,7 @@ public class SinkApiV2ResourceTest {
Response response = updateDefaultSink();
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
- assertEquals(new ErrorData("Function " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+ assertEquals(new ErrorData("Sink " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
}
@Test
@@ -600,9 +603,8 @@ public class SinkApiV2ResourceTest {
null,
filePackageUrl,
null,
- null,
- null,
new Gson().toJson(sinkConfig),
+ FunctionsImpl.SINK,
null);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
@@ -678,7 +680,7 @@ public class SinkApiV2ResourceTest {
tenant,
namespace,
null,
- "Function Name");
+ "Sink Name");
}
private void testDeregisterSinkMissingArguments(
@@ -691,6 +693,7 @@ public class SinkApiV2ResourceTest {
tenant,
namespace,
sink,
+ FunctionsImpl.SINK,
null);
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -702,6 +705,7 @@ public class SinkApiV2ResourceTest {
tenant,
namespace,
sink,
+ FunctionsImpl.SINK,
null);
}
@@ -711,7 +715,7 @@ public class SinkApiV2ResourceTest {
Response response = deregisterDefaultSink();
assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
- assertEquals(new ErrorData("Function " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+ assertEquals(new ErrorData("Sink " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
}
@Test
@@ -757,109 +761,116 @@ public class SinkApiV2ResourceTest {
assertEquals(new ErrorData("Function deregisteration interrupted").reason, ((ErrorData) response.getEntity()).reason);
}
- // Source Info doesn't exist. Maybe one day they might be added
//
- // Get Function Info
+ // Get Sink Info
//
- /*
@Test
- public void testGetFunctionMissingTenant() throws Exception {
- testGetFunctionMissingArguments(
+ public void testGetSinkMissingTenant() throws Exception {
+ testGetSinkMissingArguments(
null,
namespace,
- source,
+ sink,
"Tenant");
}
@Test
- public void testGetFunctionMissingNamespace() throws Exception {
- testGetFunctionMissingArguments(
+ public void testGetSinkMissingNamespace() throws Exception {
+ testGetSinkMissingArguments(
tenant,
null,
- source,
+ sink,
"Namespace");
}
@Test
- public void testGetFunctionMissingFunctionName() throws Exception {
- testGetFunctionMissingArguments(
+ public void testGetSinkMissingFunctionName() throws Exception {
+ testGetSinkMissingArguments(
tenant,
namespace,
null,
- "Function Name");
+ "Sink Name");
}
- private void testGetFunctionMissingArguments(
+ private void testGetSinkMissingArguments(
String tenant,
String namespace,
- String function,
+ String sink,
String missingFieldName
) throws IOException {
Response response = resource.getFunctionInfo(
tenant,
namespace,
- function);
+ sink,
+ FunctionsImpl.SINK);
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
}
- private Response getDefaultFunctionInfo() throws IOException {
+ private Response getDefaultSinkInfo() throws IOException {
return resource.getFunctionInfo(
tenant,
namespace,
- source);
+ sink,
+ FunctionsImpl.SINK);
}
@Test
- public void testGetNotExistedFunction() throws IOException {
- when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
+ public void testGetNotExistedSink() throws IOException {
+ when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
- Response response = getDefaultFunctionInfo();
+ Response response = getDefaultSinkInfo();
assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
- assertEquals(new ErrorData("Function " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+ assertEquals(new ErrorData("Sink " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
}
@Test
- public void testGetFunctionSuccess() throws Exception {
- when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+ public void testGetSinkSuccess() throws Exception {
+ when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
- SinkSpec sinkSpec = SinkSpec.newBuilder()
- .setTopic(outputTopic)
- .setSerDeClassName(outputSerdeClassName).build();
+ Function.SourceSpec sourceSpec = Function.SourceSpec.newBuilder()
+ .setSubscriptionType(Function.SubscriptionType.SHARED)
+ .setSubscriptionName(subscriptionName)
+ .putInputSpecs("input", Function.ConsumerSpec.newBuilder()
+ .setSerdeClassName(TopicSchema.DEFAULT_SERDE)
+ .setIsRegexPattern(false)
+ .build()).build();
+ Function.SinkSpec sinkSpec = Function.SinkSpec.newBuilder()
+ .setBuiltin("jdbc")
+ .build();
FunctionDetails functionDetails = FunctionDetails.newBuilder()
- .setClassName(className)
+ .setClassName(IdentityFunction.class.getName())
.setSink(sinkSpec)
- .setName(source)
+ .setName(sink)
.setNamespace(namespace)
- .setProcessingGuarantees(ProcessingGuarantees.ATMOST_ONCE)
+ .setProcessingGuarantees(Function.ProcessingGuarantees.ATLEAST_ONCE)
.setTenant(tenant)
.setParallelism(parallelism)
- .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
- .putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build();
+ .setRuntime(FunctionDetails.Runtime.JAVA)
+ .setSource(sourceSpec).build();
FunctionMetaData metaData = FunctionMetaData.newBuilder()
.setCreateTime(System.currentTimeMillis())
.setFunctionDetails(functionDetails)
- .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("/path/to/package"))
+ .setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath("/path/to/package"))
.setVersion(1234)
.build();
- when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source))).thenReturn(metaData);
+ when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))).thenReturn(metaData);
- Response response = getDefaultFunctionInfo();
+ Response response = getDefaultSinkInfo();
assertEquals(Status.OK.getStatusCode(), response.getStatus());
assertEquals(
- org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+ new Gson().toJson(SinkConfigUtils.convertFromDetails(functionDetails)),
response.getEntity());
}
//
- // List Functions
+ // List Sinks
//
@Test
- public void testListFunctionsMissingTenant() throws Exception {
- testListFunctionsMissingArguments(
+ public void testListSinksMissingTenant() throws Exception {
+ testListSinksMissingArguments(
null,
namespace,
"Tenant");
@@ -867,39 +878,70 @@ public class SinkApiV2ResourceTest {
@Test
public void testListFunctionsMissingNamespace() throws Exception {
- testListFunctionsMissingArguments(
+ testListSinksMissingArguments(
tenant,
null,
"Namespace");
}
- private void testListFunctionsMissingArguments(
+ private void testListSinksMissingArguments(
String tenant,
String namespace,
String missingFieldName
) {
Response response = resource.listFunctions(
tenant,
- namespace);
+ namespace,
+ FunctionsImpl.SINK);
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
}
- private Response listDefaultFunctions() {
+ private Response listDefaultSinks() {
return resource.listFunctions(
tenant,
- namespace);
+ namespace,
+ FunctionsImpl.SINK);
}
@Test
- public void testListFunctionsSuccess() throws Exception {
+ public void testListSinksSuccess() throws Exception {
List<String> functions = Lists.newArrayList("test-1", "test-2");
- when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functions);
+ List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
+ functionMetaDataList.add(FunctionMetaData.newBuilder().setFunctionDetails(
+ FunctionDetails.newBuilder().setName("test-1").build()
+ ).build());
+ functionMetaDataList.add(FunctionMetaData.newBuilder().setFunctionDetails(
+ FunctionDetails.newBuilder().setName("test-2").build()
+ ).build());
+ when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
+
+ Response response = listDefaultSinks();
+ assertEquals(Status.OK.getStatusCode(), response.getStatus());
+ assertEquals(new Gson().toJson(functions), response.getEntity());
+ }
- Response response = listDefaultFunctions();
+ @Test
+ public void testOnlyGetSinks() throws Exception {
+ List<String> functions = Lists.newArrayList("test-3");
+ List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
+ FunctionMetaData f1 = FunctionMetaData.newBuilder().setFunctionDetails(
+ FunctionDetails.newBuilder().setName("test-1").build()).build();
+ functionMetaDataList.add(f1);
+ FunctionMetaData f2 = FunctionMetaData.newBuilder().setFunctionDetails(
+ FunctionDetails.newBuilder().setName("test-2").build()).build();
+ functionMetaDataList.add(f2);
+ FunctionMetaData f3 = FunctionMetaData.newBuilder().setFunctionDetails(
+ FunctionDetails.newBuilder().setName("test-3").build()).build();
+ functionMetaDataList.add(f3);
+ when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
+ doReturn("Source").when(this.resource).calculateSubjectType(f1);
+ doReturn("Function").when(this.resource).calculateSubjectType(f2);
+ doReturn("Sink").when(this.resource).calculateSubjectType(f3);
+
+ Response response = listDefaultSinks();
assertEquals(Status.OK.getStatusCode(), response.getStatus());
assertEquals(new Gson().toJson(functions), response.getEntity());
}
- */
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
index 1ca869e..eee684f 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.worker.rest.api.v2;
+import com.google.common.collect.Lists;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.api.namespace.Namespace;
@@ -26,6 +27,7 @@ import org.apache.logging.log4j.core.config.Configurator;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function.*;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.source.TopicSchema;
@@ -37,6 +39,7 @@ import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.Assert;
@@ -48,6 +51,8 @@ import org.testng.annotations.Test;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.io.*;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -133,6 +138,7 @@ public class SourceApiV2ResourceTest {
doReturn(null).when(resource).extractNarClassLoader(anyString(), anyString(), anyObject(), anyBoolean());
mockStatic(SourceConfigUtils.class);
when(SourceConfigUtils.convert(anyObject(), anyObject())).thenReturn(FunctionDetails.newBuilder().build());
+ Mockito.doReturn("Source").when(this.resource).calculateSubjectType(any());
}
//
@@ -181,7 +187,7 @@ public class SourceApiV2ResourceTest {
outputSerdeClassName,
className,
parallelism,
- "Function Name is not provided");
+ "Source Name is not provided");
}
@Test
@@ -256,9 +262,8 @@ public class SourceApiV2ResourceTest {
details,
null,
null,
- null,
new Gson().toJson(sourceConfig),
- null,
+ FunctionsImpl.SOURCE,
null);
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -282,9 +287,8 @@ public class SourceApiV2ResourceTest {
mockedFormData,
null,
null,
- null,
new Gson().toJson(sourceConfig),
- null,
+ FunctionsImpl.SOURCE,
null);
}
@@ -296,7 +300,7 @@ public class SourceApiV2ResourceTest {
Response response = registerDefaultSource();
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
- assertEquals(new ErrorData("Function " + source + " already exists").reason, ((ErrorData) response.getEntity()).reason);
+ assertEquals(new ErrorData("Source " + source + " already exists").reason, ((ErrorData) response.getEntity()).reason);
}
@Test
@@ -424,7 +428,7 @@ public class SourceApiV2ResourceTest {
outputSerdeClassName,
className,
parallelism,
- "Function Name is not provided");
+ "Source Name is not provided");
}
@Test
@@ -501,9 +505,8 @@ public class SourceApiV2ResourceTest {
details,
null,
null,
- null,
new Gson().toJson(sourceConfig),
- null,
+ FunctionsImpl.SOURCE,
null);
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -528,9 +531,8 @@ public class SourceApiV2ResourceTest {
mockedFormData,
null,
null,
- null,
new Gson().toJson(sourceConfig),
- null,
+ FunctionsImpl.SOURCE,
null);
}
@@ -540,7 +542,7 @@ public class SourceApiV2ResourceTest {
Response response = updateDefaultSource();
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
- assertEquals(new ErrorData("Function " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+ assertEquals(new ErrorData("Source " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
}
@Test
@@ -611,9 +613,8 @@ public class SourceApiV2ResourceTest {
null,
filePackageUrl,
null,
- null,
new Gson().toJson(sourceConfig),
- null,
+ FunctionsImpl.SOURCE,
null);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
@@ -689,7 +690,7 @@ public class SourceApiV2ResourceTest {
tenant,
namespace,
null,
- "Function Name");
+ "Source Name");
}
private void testDeregisterSourceMissingArguments(
@@ -702,6 +703,7 @@ public class SourceApiV2ResourceTest {
tenant,
namespace,
function,
+ FunctionsImpl.SOURCE,
null);
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -713,6 +715,7 @@ public class SourceApiV2ResourceTest {
tenant,
namespace,
source,
+ FunctionsImpl.SOURCE,
null);
}
@@ -722,7 +725,7 @@ public class SourceApiV2ResourceTest {
Response response = deregisterDefaultSource();
assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
- assertEquals(new ErrorData("Function " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+ assertEquals(new ErrorData("Source " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
}
@Test
@@ -768,15 +771,13 @@ public class SourceApiV2ResourceTest {
assertEquals(new ErrorData("Function deregisteration interrupted").reason, ((ErrorData) response.getEntity()).reason);
}
- // Source Info doesn't exist. Maybe one day they might be added
//
- // Get Function Info
+ // Get Source Info
//
- /*
@Test
- public void testGetFunctionMissingTenant() throws Exception {
- testGetFunctionMissingArguments(
+ public void testGetSourceMissingTenant() throws Exception {
+ testGetSourceMissingArguments(
null,
namespace,
source,
@@ -784,8 +785,8 @@ public class SourceApiV2ResourceTest {
}
@Test
- public void testGetFunctionMissingNamespace() throws Exception {
- testGetFunctionMissingArguments(
+ public void testGetSourceMissingNamespace() throws Exception {
+ testGetSourceMissingArguments(
tenant,
null,
source,
@@ -793,62 +794,66 @@ public class SourceApiV2ResourceTest {
}
@Test
- public void testGetFunctionMissingFunctionName() throws Exception {
- testGetFunctionMissingArguments(
+ public void testGetSourceMissingFunctionName() throws Exception {
+ testGetSourceMissingArguments(
tenant,
namespace,
null,
- "Function Name");
+ "Source Name");
}
- private void testGetFunctionMissingArguments(
+ private void testGetSourceMissingArguments(
String tenant,
String namespace,
- String function,
+ String source,
String missingFieldName
) throws IOException {
Response response = resource.getFunctionInfo(
tenant,
namespace,
- function);
+ source,
+ FunctionsImpl.SOURCE);
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
}
- private Response getDefaultFunctionInfo() throws IOException {
+ private Response getDefaultSourceInfo() throws IOException {
return resource.getFunctionInfo(
tenant,
namespace,
- source);
+ source,
+ FunctionsImpl.SOURCE);
}
@Test
- public void testGetNotExistedFunction() throws IOException {
+ public void testGetNotExistedSource() throws IOException {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
- Response response = getDefaultFunctionInfo();
+ Response response = getDefaultSourceInfo();
assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
- assertEquals(new ErrorData("Function " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+ assertEquals(new ErrorData("Source " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
}
@Test
- public void testGetFunctionSuccess() throws Exception {
+ public void testGetSourceSuccess() throws Exception {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+ SourceSpec sourceSpec = SourceSpec.newBuilder().setBuiltin("jdbc").build();
SinkSpec sinkSpec = SinkSpec.newBuilder()
.setTopic(outputTopic)
.setSerDeClassName(outputSerdeClassName).build();
FunctionDetails functionDetails = FunctionDetails.newBuilder()
- .setClassName(className)
+ .setClassName(IdentityFunction.class.getName())
.setSink(sinkSpec)
.setName(source)
.setNamespace(namespace)
- .setProcessingGuarantees(ProcessingGuarantees.ATMOST_ONCE)
+ .setProcessingGuarantees(ProcessingGuarantees.ATLEAST_ONCE)
+ .setRuntime(FunctionDetails.Runtime.JAVA)
+ .setAutoAck(true)
.setTenant(tenant)
.setParallelism(parallelism)
- .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
- .putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build();
+ .setSource(sourceSpec).build();
FunctionMetaData metaData = FunctionMetaData.newBuilder()
.setCreateTime(System.currentTimeMillis())
.setFunctionDetails(functionDetails)
@@ -857,60 +862,90 @@ public class SourceApiV2ResourceTest {
.build();
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source))).thenReturn(metaData);
- Response response = getDefaultFunctionInfo();
+ Response response = getDefaultSourceInfo();
assertEquals(Status.OK.getStatusCode(), response.getStatus());
assertEquals(
- org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+ new Gson().toJson(SourceConfigUtils.convertFromDetails(functionDetails)),
response.getEntity());
}
//
- // List Functions
+ // List Sources
//
@Test
- public void testListFunctionsMissingTenant() throws Exception {
- testListFunctionsMissingArguments(
+ public void testListSourcesMissingTenant() throws Exception {
+ testListSourcesMissingArguments(
null,
namespace,
"Tenant");
}
@Test
- public void testListFunctionsMissingNamespace() throws Exception {
- testListFunctionsMissingArguments(
+ public void testListSourcesMissingNamespace() throws Exception {
+ testListSourcesMissingArguments(
tenant,
null,
"Namespace");
}
- private void testListFunctionsMissingArguments(
+ private void testListSourcesMissingArguments(
String tenant,
String namespace,
String missingFieldName
) {
Response response = resource.listFunctions(
tenant,
- namespace);
+ namespace,
+ FunctionsImpl.SOURCE);
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
}
- private Response listDefaultFunctions() {
+ private Response listDefaultSources() {
return resource.listFunctions(
tenant,
- namespace);
+ namespace,FunctionsImpl.SOURCE);
}
@Test
- public void testListFunctionsSuccess() throws Exception {
+ public void testListSourcesSuccess() throws Exception {
List<String> functions = Lists.newArrayList("test-1", "test-2");
- when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functions);
+ List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
+ functionMetaDataList.add(FunctionMetaData.newBuilder().setFunctionDetails(
+ FunctionDetails.newBuilder().setName("test-1").build()
+ ).build());
+ functionMetaDataList.add(FunctionMetaData.newBuilder().setFunctionDetails(
+ FunctionDetails.newBuilder().setName("test-2").build()
+ ).build());
+ when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
+
+ Response response = listDefaultSources();
+ assertEquals(Status.OK.getStatusCode(), response.getStatus());
+ assertEquals(new Gson().toJson(functions), response.getEntity());
+ }
- Response response = listDefaultFunctions();
+ @Test
+ public void testOnlyGetSources() throws Exception {
+ List<String> functions = Lists.newArrayList("test-1");
+ List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
+ FunctionMetaData f1 = FunctionMetaData.newBuilder().setFunctionDetails(
+ FunctionDetails.newBuilder().setName("test-1").build()).build();
+ functionMetaDataList.add(f1);
+ FunctionMetaData f2 = FunctionMetaData.newBuilder().setFunctionDetails(
+ FunctionDetails.newBuilder().setName("test-2").build()).build();
+ functionMetaDataList.add(f2);
+ FunctionMetaData f3 = FunctionMetaData.newBuilder().setFunctionDetails(
+ FunctionDetails.newBuilder().setName("test-3").build()).build();
+ functionMetaDataList.add(f3);
+ when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
+ doReturn("Source").when(this.resource).calculateSubjectType(f1);
+ doReturn("Function").when(this.resource).calculateSubjectType(f2);
+ doReturn("Sink").when(this.resource).calculateSubjectType(f3);
+
+ Response response = listDefaultSources();
assertEquals(Status.OK.getStatusCode(), response.getStatus());
assertEquals(new Gson().toJson(functions), response.getEntity());
}
- */
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index b8452f1..6659fcf 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -214,7 +214,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
boolean builtin) throws Exception {
String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
- "functions",
+ "sink",
"get",
"--tenant", tenant,
"--namespace", namespace,
@@ -224,7 +224,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
log.info("Get sink info : {}", result.getStdout());
if (builtin) {
assertTrue(
- result.getStdout().contains("\"builtin\": \"" + tester.getSinkType().name().toLowerCase() + "\""),
+ result.getStdout().contains("\"archive\": \"builtin://" + tester.getSinkType().name().toLowerCase() + "\""),
result.getStdout()
);
} else {
@@ -366,7 +366,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
protected void getSinkInfoNotFound(String tenant, String namespace, String sinkName) throws Exception {
String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
- "functions",
+ "sink",
"get",
"--tenant", tenant,
"--namespace", namespace,
@@ -376,7 +376,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
pulsarCluster.getAnyWorker().execCmd(commands);
fail("Command should have exited with non-zero");
} catch (ContainerExecException e) {
- assertTrue(e.getResult().getStderr().contains("Reason: Function " + sinkName + " doesn't exist"));
+ assertTrue(e.getResult().getStderr().contains("Reason: Sink " + sinkName + " doesn't exist"));
}
}
@@ -465,7 +465,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
String sourceName) throws Exception {
String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
- "functions",
+ "source",
"get",
"--tenant", tenant,
"--namespace", namespace,
@@ -474,7 +474,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
log.info("Get source info : {}", result.getStdout());
assertTrue(
- result.getStdout().contains("\"builtin\": \"" + tester.getSourceType() + "\""),
+ result.getStdout().contains("\"archive\": \"builtin://" + tester.getSourceType() + "\""),
result.getStdout()
);
}
@@ -564,7 +564,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
protected void getSourceInfoNotFound(String tenant, String namespace, String sourceName) throws Exception {
String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
- "functions",
+ "source",
"get",
"--tenant", tenant,
"--namespace", namespace,
@@ -574,7 +574,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
pulsarCluster.getAnyWorker().execCmd(commands);
fail("Command should have exited with non-zero");
} catch (ContainerExecException e) {
- assertTrue(e.getResult().getStderr().contains("Reason: Function " + sourceName + " doesn't exist"));
+ assertTrue(e.getResult().getStderr().contains("Reason: Source " + sourceName + " doesn't exist"));
}
}