You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/08/31 12:24:51 UTC
[pulsar] branch master updated: [PIP-193] [feature][connectors] Add support for a transform Function in Sinks (#16740)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dab0d1f389e [PIP-193] [feature][connectors] Add support for a transform Function in Sinks (#16740)
dab0d1f389e is described below
commit dab0d1f389ee66277c8350072af304895619eb9a
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Wed Aug 31 14:24:42 2022 +0200
[PIP-193] [feature][connectors] Add support for a transform Function in Sinks (#16740)
---
.../pulsar/broker/admin/impl/FunctionsBase.java | 7 +-
.../org/apache/pulsar/client/admin/Functions.java | 36 ++
.../org/apache/pulsar/common/io/SinkConfig.java | 3 +
.../client/admin/internal/FunctionsImpl.java | 16 +
.../apache/pulsar/admin/cli/CmdFunctionsTest.java | 38 +++
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 7 +-
.../java/org/apache/pulsar/admin/cli/CmdSinks.java | 19 ++
.../org/apache/pulsar/admin/cli/TestCmdSinks.java | 62 ++++
.../pulsar/functions/instance/InstanceConfig.java | 1 +
.../functions/instance/JavaInstanceRunnable.java | 198 ++++++++++-
.../functions/instance/OutputRecordSinkRecord.java | 11 +
.../functions/instance/SinkSchemaInfoProvider.java | 79 +++++
.../instance/JavaInstanceRunnableTest.java | 2 +-
.../functions/api/examples/RecordFunction.java | 1 +
.../org/apache/pulsar/functions/LocalRunner.java | 14 +-
.../proto/src/main/proto/Function.proto | 1 +
.../functions/runtime/JavaInstanceStarter.java | 12 +
.../pulsar/functions/runtime/RuntimeFactory.java | 1 +
.../pulsar/functions/runtime/RuntimeSpawner.java | 3 +
.../pulsar/functions/runtime/RuntimeUtils.java | 20 +-
.../runtime/kubernetes/KubernetesRuntime.java | 87 ++---
.../kubernetes/KubernetesRuntimeFactory.java | 3 +
.../functions/runtime/process/ProcessRuntime.java | 2 +
.../runtime/process/ProcessRuntimeFactory.java | 3 +
.../functions/runtime/thread/ThreadRuntime.java | 35 +-
.../runtime/thread/ThreadRuntimeFactory.java | 3 +
.../pulsar/functions/runtime/RuntimeUtilsTest.java | 1 +
.../runtime/kubernetes/KubernetesRuntimeTest.java | 44 +--
.../runtime/process/ProcessRuntimeTest.java | 23 +-
.../pulsar/functions/utils/FunctionCommon.java | 83 ++---
.../functions/utils/FunctionConfigUtils.java | 3 +-
.../pulsar/functions/utils/SinkConfigUtils.java | 85 ++++-
.../functions/utils/SinkConfigUtilsTest.java | 83 ++++-
.../pulsar/functions/worker/FunctionActioner.java | 93 ++++--
.../functions/worker/FunctionRuntimeManager.java | 7 +-
.../functions/worker/rest/api/ComponentImpl.java | 178 ++++++++--
.../functions/worker/rest/api/FunctionsImpl.java | 110 +-----
.../functions/worker/rest/api/SinksImpl.java | 149 ++++-----
.../functions/worker/rest/api/SourcesImpl.java | 89 +----
.../worker/rest/api/v3/FunctionsApiV3Resource.java | 7 +-
.../functions/worker/service/api/Component.java | 15 +-
.../functions/worker/FunctionActionerTest.java | 75 +++--
.../worker/FunctionRuntimeManagerTest.java | 9 +-
.../worker/rest/api/FunctionsImplTest.java | 4 +-
.../rest/api/v3/FunctionApiV3ResourceTest.java | 60 ++++
.../worker/rest/api/v3/SinkApiV3ResourceTest.java | 245 +++++++++++---
.../tests/integration/io/TestLoggingSink.java | 74 +++++
.../io/SinkWithTransformFunctionTest.java | 370 +++++++++++++++++++++
48 files changed, 1890 insertions(+), 581 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index d5380407491..be9dc8c7b71 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -711,9 +711,12 @@ public class FunctionsBase extends AdminResource {
@ApiParam(value = "The namespace of a Pulsar Function")
final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Function")
- final @PathParam("functionName") String functionName) {
+ final @PathParam("functionName") String functionName,
+ @ApiParam(value = "Whether to download the transform-function")
+ final @QueryParam("transform-function") boolean transformFunction) {
- return functions().downloadFunction(tenant, namespace, functionName, clientAppId(), clientAuthData());
+ return functions()
+ .downloadFunction(tenant, namespace, functionName, clientAppId(), clientAuthData(), transformFunction);
}
@GET
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 5bf289f9b65..55e0855ca55 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -757,6 +757,42 @@ public interface Functions {
CompletableFuture<Void> downloadFunctionAsync(
String destinationFile, String tenant, String namespace, String function);
+ /**
+ * Download Function Code.
+ *
+ * @param destinationFile
+ * file where data should be downloaded to
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param function
+ * Function name
+ * @param transformFunction
+ * Whether to download the transform function (for sources and sinks)
+ * @throws PulsarAdminException
+ */
+ void downloadFunction(String destinationFile, String tenant, String namespace, String function,
+ boolean transformFunction) throws PulsarAdminException;
+
+ /**
+ * Download Function Code asynchronously.
+ *
+ * @param destinationFile
+ * file where data should be downloaded to
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param function
+ * Function name
+ * @param transformFunction
+ * Whether to download the transform function (for sources and sinks)
+ */
+ CompletableFuture<Void> downloadFunctionAsync(
+ String destinationFile, String tenant, String namespace, String function, boolean transformFunction);
+
+
/**
* Deprecated in favor of getting sources and sinks for their own APIs.
* <p/>
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
index f1844e6090c..316cdd69e07 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
@@ -91,4 +91,7 @@ public class SinkConfig {
// to change behavior at runtime. Currently, this primarily used by the KubernetesManifestCustomizer
// interface
private String customRuntimeOptions;
+ private String transformFunction;
+ private String transformFunctionClassName;
+ private String transformFunctionConfig;
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index ff2c22daae5..482846ad8ac 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -606,6 +606,22 @@ public class FunctionsImpl extends ComponentResource implements Functions {
functions.path(tenant).path(namespace).path(functionName).path("download"));
}
+ @Override
+ public void downloadFunction(String destinationPath, String tenant, String namespace, String functionName,
+ boolean transformFunction) throws PulsarAdminException {
+ downloadFile(destinationPath, functions.path(tenant).path(namespace).path(functionName).path("download")
+ .queryParam("transform-function", transformFunction));
+ }
+
+ @Override
+ public CompletableFuture<Void> downloadFunctionAsync(
+ String destinationPath, String tenant, String namespace, String functionName, boolean transformFunction) {
+ return downloadFileAsync(destinationPath,
+ functions.path(tenant).path(namespace).path(functionName).path("download")
+ .queryParam("transform-function", transformFunction));
+ }
+
+
@Override
public void downloadFunction(String destinationPath, String path) throws PulsarAdminException {
downloadFile(destinationPath, functions.path("download").queryParam("path", path));
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index e98894e9c65..4fc723e5f4c 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -859,6 +859,44 @@ public class CmdFunctionsTest {
verify(functions, times(1)).updateFunctionWithUrl(any(FunctionConfig.class), anyString(), eq(updateOptions));
}
+ @Test
+ public void testDownloadFunction() throws Exception {
+ cmd.run(new String[] {
+ "download",
+ "--destination-file", JAR_NAME,
+ "--name", FN_NAME,
+ "--tenant", TENANT,
+ "--namespace", NAMESPACE
+ });
+ verify(functions, times(1))
+ .downloadFunction(JAR_NAME, TENANT, NAMESPACE, FN_NAME, false);
+ }
+
+ @Test
+ public void testDownloadFunctionByPath() throws Exception {
+ cmd.run(new String[] {
+ "download",
+ "--destination-file", JAR_NAME,
+ "--path", PACKAGE_URL
+ });
+ verify(functions, times(1))
+ .downloadFunction(JAR_NAME, PACKAGE_URL);
+ }
+
+ @Test
+ public void testDownloadTransformFunction() throws Exception {
+ cmd.run(new String[] {
+ "download",
+ "--destination-file", JAR_NAME,
+ "--name", FN_NAME,
+ "--tenant", TENANT,
+ "--namespace", NAMESPACE,
+ "--transform-function"
+ });
+ verify(functions, times(1))
+ .downloadFunction(JAR_NAME, TENANT, NAMESPACE, FN_NAME, true);
+ }
+
public static class ConsoleOutputCapturer {
private ByteArrayOutputStream stdout;
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index a37197ccc28..43d13bc97dd 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -1171,6 +1171,10 @@ public class CmdFunctions extends CmdBase {
description = "Path or functionPkgUrl to store the content",
listConverter = StringConverter.class, required = false, hidden = true)
protected String path;
+ @Parameter(
+ names = "--transform-function",
+ description = "Download the transform Function of the connector")
+ protected Boolean transformFunction = false;
private void mergeArgs() {
if (isBlank(destinationFile) && !isBlank(deprecatedDestinationFile)) {
@@ -1195,7 +1199,8 @@ public class CmdFunctions extends CmdBase {
if (path != null) {
getAdmin().functions().downloadFunction(destinationFile, path);
} else {
- getAdmin().functions().downloadFunction(destinationFile, tenant, namespace, functionName);
+ getAdmin().functions()
+ .downloadFunction(destinationFile, tenant, namespace, functionName, transformFunction);
}
print("Downloaded successfully");
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index f89dc5b8cc0..7aae6ece799 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -398,6 +398,13 @@ public class CmdSinks extends CmdBase {
@Parameter(names = "--secrets", description = "The map of secretName to an object that encapsulates "
+ "how the secret is fetched by the underlying secrets provider")
protected String secretsString;
+ @Parameter(names = "--transform-function", description = "Transform function applied before the Sink")
+ protected String transformFunction;
+ @Parameter(names = "--transform-function-classname", description = "The transform function class name")
+ protected String transformFunctionClassName;
+ @Parameter(names = "--transform-function-config", description = "Configuration of the transform function "
+ + "applied before the Sink")
+ protected String transformFunctionConfig;
protected SinkConfig sinkConfig;
@@ -578,6 +585,18 @@ public class CmdSinks extends CmdBase {
sinkConfig.setSecrets(secretsMap);
}
+ if (transformFunction != null) {
+ sinkConfig.setTransformFunction(transformFunction);
+ }
+
+ if (transformFunctionClassName != null) {
+ sinkConfig.setTransformFunctionClassName(transformFunctionClassName);
+ }
+
+ if (transformFunctionConfig != null) {
+ sinkConfig.setTransformFunctionConfig(transformFunctionConfig);
+ }
+
// check if configs are valid
validateSinkConfigs(sinkConfig);
}
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index 7a00f0429fc..1cb86102802 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -83,6 +83,9 @@ public class TestCmdSinks {
private static final Long RAM = 1024L * 1024L;
private static final Long DISK = 1024L * 1024L * 1024L;
private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 +0000 2018\",\"int\":1000,\"int_string\":\"1000\",\"float\":1000.0,\"float_string\":\"1000.0\"}";
+ private static final String TRANSFORM_FUNCTION = "transform";
+ private static final String TRANSFORM_FUNCTION_CLASSNAME = "TransformFunction";
+ private static final String TRANSFORM_FUNCTION_CONFIG = "{\"test_function_config\": \"\"}";
private PulsarAdmin pulsarAdmin;
private Sinks sink;
@@ -146,6 +149,11 @@ public class TestCmdSinks {
sinkConfig.setArchive(JAR_FILE_PATH);
sinkConfig.setResources(new Resources(CPU, RAM, DISK));
sinkConfig.setConfigs(createSink.parseConfigs(SINK_CONFIG_STRING));
+
+ sinkConfig.setTransformFunction(TRANSFORM_FUNCTION);
+ sinkConfig.setTransformFunctionClassName(TRANSFORM_FUNCTION_CLASSNAME);
+ sinkConfig.setTransformFunctionConfig(TRANSFORM_FUNCTION_CONFIG);
+
return sinkConfig;
}
@@ -166,6 +174,9 @@ public class TestCmdSinks {
RAM,
DISK,
SINK_CONFIG_STRING,
+ TRANSFORM_FUNCTION,
+ TRANSFORM_FUNCTION_CLASSNAME,
+ TRANSFORM_FUNCTION_CONFIG,
sinkConfig
);
}
@@ -188,6 +199,9 @@ public class TestCmdSinks {
RAM,
DISK,
SINK_CONFIG_STRING,
+ TRANSFORM_FUNCTION,
+ TRANSFORM_FUNCTION_CLASSNAME,
+ TRANSFORM_FUNCTION_CONFIG,
sinkConfig
);
}
@@ -211,6 +225,9 @@ public class TestCmdSinks {
RAM,
DISK,
SINK_CONFIG_STRING,
+ TRANSFORM_FUNCTION,
+ TRANSFORM_FUNCTION_CLASSNAME,
+ TRANSFORM_FUNCTION_CONFIG,
sinkConfig
);
}
@@ -233,6 +250,9 @@ public class TestCmdSinks {
RAM,
DISK,
SINK_CONFIG_STRING,
+ TRANSFORM_FUNCTION,
+ TRANSFORM_FUNCTION_CLASSNAME,
+ TRANSFORM_FUNCTION_CONFIG,
sinkConfig
);
}
@@ -255,6 +275,9 @@ public class TestCmdSinks {
RAM,
DISK,
SINK_CONFIG_STRING,
+ TRANSFORM_FUNCTION,
+ TRANSFORM_FUNCTION_CLASSNAME,
+ TRANSFORM_FUNCTION_CONFIG,
sinkConfig
);
}
@@ -277,6 +300,9 @@ public class TestCmdSinks {
RAM,
DISK,
SINK_CONFIG_STRING,
+ TRANSFORM_FUNCTION,
+ TRANSFORM_FUNCTION_CLASSNAME,
+ TRANSFORM_FUNCTION_CONFIG,
sinkConfig
);
}
@@ -301,6 +327,9 @@ public class TestCmdSinks {
RAM,
DISK,
SINK_CONFIG_STRING,
+ TRANSFORM_FUNCTION,
+ TRANSFORM_FUNCTION_CLASSNAME,
+ TRANSFORM_FUNCTION_CONFIG,
sinkConfig
);
}
@@ -323,6 +352,9 @@ public class TestCmdSinks {
RAM,
DISK,
null,
+ TRANSFORM_FUNCTION,
+ TRANSFORM_FUNCTION_CLASSNAME,
+ TRANSFORM_FUNCTION_CONFIG,
sinkConfig
);
}
@@ -341,6 +373,9 @@ public class TestCmdSinks {
Long ram,
Long disk,
String sinkConfigString,
+ String transformFunction,
+ String transformFunctionClassName,
+ String transformFunctionConfig,
SinkConfig sinkConfig) throws Exception {
// test create sink
@@ -357,6 +392,9 @@ public class TestCmdSinks {
createSink.ram = ram;
createSink.disk = disk;
createSink.sinkConfigString = sinkConfigString;
+ createSink.transformFunction = transformFunction;
+ createSink.transformFunctionClassName = transformFunctionClassName;
+ createSink.transformFunctionConfig = transformFunctionConfig;
createSink.processArguments();
@@ -376,6 +414,9 @@ public class TestCmdSinks {
updateSink.ram = ram;
updateSink.disk = disk;
updateSink.sinkConfigString = sinkConfigString;
+ updateSink.transformFunction = transformFunction;
+ updateSink.transformFunctionClassName = transformFunctionClassName;
+ updateSink.transformFunctionConfig = transformFunctionConfig;
updateSink.processArguments();
@@ -395,6 +436,9 @@ public class TestCmdSinks {
localSinkRunner.ram = ram;
localSinkRunner.disk = disk;
localSinkRunner.sinkConfigString = sinkConfigString;
+ localSinkRunner.transformFunction = transformFunction;
+ localSinkRunner.transformFunctionClassName = transformFunctionClassName;
+ localSinkRunner.transformFunctionConfig = transformFunctionConfig;
localSinkRunner.processArguments();
@@ -539,6 +583,9 @@ public class TestCmdSinks {
testSinkConfig.setResources(new Resources(CPU + 1, RAM + 1, DISK + 1));
testSinkConfig.setConfigs(createSink.parseConfigs("{\"created_at-prime\":\"Mon Jul 02 00:33:15 +0000 2018\", \"otherConfigProperties\":{\"property1.value\":\"value1\",\"property2.value\":\"value2\"}}"));
+ testSinkConfig.setTransformFunction(TRANSFORM_FUNCTION + "-prime");
+ testSinkConfig.setTransformFunction(TRANSFORM_FUNCTION_CLASSNAME + "-prime");
+ testSinkConfig.setTransformFunction("{\"test_function_config\": \"prime\"}");
SinkConfig expectedSinkConfig = getSinkConfig();
@@ -563,6 +610,9 @@ public class TestCmdSinks {
RAM,
DISK,
SINK_CONFIG_STRING,
+ TRANSFORM_FUNCTION,
+ TRANSFORM_FUNCTION_CLASSNAME,
+ TRANSFORM_FUNCTION_CONFIG,
file.getAbsolutePath(),
expectedSinkConfig
);
@@ -583,6 +633,9 @@ public class TestCmdSinks {
Long ram,
Long disk,
String sinkConfigString,
+ String transformFunction,
+ String transformFunctionClassName,
+ String transformFunctionConfig,
String sinkConfigFile,
SinkConfig sinkConfig
) throws Exception {
@@ -602,6 +655,9 @@ public class TestCmdSinks {
createSink.ram = ram;
createSink.disk = disk;
createSink.sinkConfigString = sinkConfigString;
+ createSink.transformFunction = transformFunction;
+ createSink.transformFunctionClassName = transformFunctionClassName;
+ createSink.transformFunctionConfig = transformFunctionConfig;
createSink.sinkConfigFile = sinkConfigFile;
createSink.processArguments();
@@ -622,6 +678,9 @@ public class TestCmdSinks {
updateSink.ram = ram;
updateSink.disk = disk;
updateSink.sinkConfigString = sinkConfigString;
+ updateSink.transformFunction = transformFunction;
+ updateSink.transformFunctionClassName = transformFunctionClassName;
+ updateSink.transformFunctionConfig = transformFunctionConfig;
updateSink.sinkConfigFile = sinkConfigFile;
updateSink.processArguments();
@@ -642,6 +701,9 @@ public class TestCmdSinks {
localSinkRunner.ram = ram;
localSinkRunner.disk = disk;
localSinkRunner.sinkConfigString = sinkConfigString;
+ localSinkRunner.transformFunction = transformFunction;
+ localSinkRunner.transformFunctionClassName = transformFunctionClassName;
+ localSinkRunner.transformFunctionConfig = transformFunctionConfig;
localSinkRunner.sinkConfigFile = sinkConfigFile;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
index 71f0c154af6..3aefa6b2dca 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails;
public class InstanceConfig {
private int instanceId;
private String functionId;
+ private String transformFunctionId;
private String functionVersion;
private FunctionDetails functionDetails;
private int maxBufferedTuples;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 11e443e72e0..bd779e0fccc 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -22,11 +22,14 @@ import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFuncti
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
+import io.netty.buffer.ByteBuf;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
@@ -42,16 +45,31 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
+import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema;
+import org.apache.pulsar.client.impl.schema.ProtobufSchema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.StateStore;
import org.apache.pulsar.functions.api.StateStoreContext;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl;
import org.apache.pulsar.functions.instance.state.InstanceStateManager;
import org.apache.pulsar.functions.instance.state.StateManager;
@@ -126,6 +144,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private final Map<String, String> properties;
private final ClassLoader instanceClassLoader;
+ private final ClassLoader componentClassLoader;
private final ClassLoader functionClassLoader;
// a flog to determine if member variables have been initialized as part of setup().
@@ -135,6 +154,10 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
// a read write lock for stats operations
private final ReadWriteLock statsLock = new ReentrantReadWriteLock();
+ private Class<?> sinkTypeArg;
+ private final AtomicReference<Schema<?>> sinkSchema = new AtomicReference<>();
+ private SinkSchemaInfoProvider sinkSchemaInfoProvider = null;
+
public JavaInstanceRunnable(InstanceConfig instanceConfig,
ClientBuilder clientBuilder,
PulsarClient pulsarClient,
@@ -143,7 +166,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
String stateStorageServiceUrl,
SecretsProvider secretsProvider,
FunctionCollectorRegistry collectorRegistry,
- ClassLoader functionClassLoader) throws PulsarClientException {
+ ClassLoader componentClassLoader,
+ ClassLoader transformFunctionClassLoader) throws PulsarClientException {
this.instanceConfig = instanceConfig;
this.clientBuilder = clientBuilder;
this.client = (PulsarClientImpl) pulsarClient;
@@ -151,7 +175,10 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
this.stateStorageImplClass = stateStorageImplClass;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.secretsProvider = secretsProvider;
- this.functionClassLoader = functionClassLoader;
+ this.componentClassLoader = componentClassLoader;
+ this.functionClassLoader = transformFunctionClassLoader != null
+ ? transformFunctionClassLoader
+ : componentClassLoader;
this.metricsLabels = new String[]{
instanceConfig.getFunctionDetails().getTenant(),
String.format("%s/%s", instanceConfig.getFunctionDetails().getTenant(),
@@ -227,6 +254,10 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
// start any log topic handler
setupLogHandler();
+ if (!(object instanceof IdentityFunction) && !(sink instanceof PulsarSink)) {
+ sinkSchemaInfoProvider = new SinkSchemaInfoProvider();
+ }
+
javaInstance = new JavaInstance(contextImpl, object, instanceConfig);
try {
Thread.currentThread().setContextClassLoader(functionClassLoader);
@@ -260,8 +291,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
Thread currentThread = Thread.currentThread();
Consumer<Throwable> asyncErrorHandler = throwable -> currentThread.interrupt();
- AsyncResultConsumer asyncResultConsumer =
- (record, javaExecutionResult) -> handleResult(record, javaExecutionResult);
+ AsyncResultConsumer asyncResultConsumer = this::handleResult;
while (true) {
currentRecord = readInput();
@@ -383,11 +413,18 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private void sendOutputMessage(Record srcRecord, Object output) throws Exception {
if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
- Thread.currentThread().setContextClassLoader(functionClassLoader);
+ Thread.currentThread().setContextClassLoader(componentClassLoader);
}
AbstractSinkRecord<?> sinkRecord;
if (output instanceof Record) {
- sinkRecord = new OutputRecordSinkRecord<>(srcRecord, (Record) output);
+ Record record = (Record) output;
+ if (sinkSchemaInfoProvider != null) {
+ // Function and Sink coupled together so we need to encode with the Function Schema
+ // and decode with the Sink schema
+ sinkRecord = encodeWithRecordSchemaAndDecodeWithSinkSchema(srcRecord, record);
+ } else {
+ sinkRecord = new OutputRecordSinkRecord<>(srcRecord, record);
+ }
} else {
sinkRecord = new SinkRecord<>(srcRecord, output);
}
@@ -404,10 +441,50 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
}
+ private OutputRecordSinkRecord encodeWithRecordSchemaAndDecodeWithSinkSchema(Record srcRecord, Record record) {
+ AbstractSinkRecord<?> sinkRecord;
+ Schema encodingSchema = record.getSchema();
+ boolean isKeyValueSeparated = false;
+ if (encodingSchema instanceof KeyValueSchema) {
+ KeyValueSchema<?, ?> kvSchema = (KeyValueSchema<?, ?>) encodingSchema;
+ // If the encoding is SEPARATED, it's easier to encode/decode with INLINE
+ // and rebuild the SEPARATED KeyValueSchema after decoding
+ if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
+ encodingSchema = KeyValueSchemaImpl.of(kvSchema.getKeySchema(), kvSchema.getValueSchema());
+ isKeyValueSeparated = true;
+ }
+ }
+ byte[] encoded = encodingSchema.encode(record.getValue());
+
+ if (sinkSchema.get() == null) {
+ Schema<?> schema = getSinkSchema(record, sinkTypeArg);
+ schema.setSchemaInfoProvider(sinkSchemaInfoProvider);
+ sinkSchema.compareAndSet(null, schema);
+ }
+ Schema<?> schema = sinkSchema.get();
+ SchemaVersion schemaVersion = sinkSchemaInfoProvider.addSchemaIfNeeded(encodingSchema);
+ final byte[] schemaVersionBytes = schemaVersion.bytes();
+ Object decoded = schema.decode(encoded, schemaVersionBytes);
+
+ if (schema instanceof AutoConsumeSchema) {
+ schema = ((AutoConsumeSchema) schema).getInternalSchema(schemaVersionBytes);
+ }
+
+ final Schema<?> finalSchema;
+ if (isKeyValueSeparated && schema instanceof KeyValueSchema) {
+ KeyValueSchema<?, ?> kvSchema = (KeyValueSchema<?, ?>) schema;
+ finalSchema = KeyValueSchemaImpl.of(kvSchema.getKeySchema(), kvSchema.getValueSchema(),
+ KeyValueEncodingType.SEPARATED);
+ } else {
+ finalSchema = schema;
+ }
+ return new OutputRecordSinkRecord(srcRecord, record, decoded, finalSchema);
+ }
+
private Record readInput() throws Exception {
Record record;
if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
- Thread.currentThread().setContextClassLoader(functionClassLoader);
+ Thread.currentThread().setContextClassLoader(componentClassLoader);
}
try {
record = this.source.read();
@@ -446,7 +523,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
if (source != null) {
if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
- Thread.currentThread().setContextClassLoader(functionClassLoader);
+ Thread.currentThread().setContextClassLoader(componentClassLoader);
}
try {
source.close();
@@ -461,7 +538,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
if (sink != null) {
if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
- Thread.currentThread().setContextClassLoader(functionClassLoader);
+ Thread.currentThread().setContextClassLoader(componentClassLoader);
}
try {
sink.close();
@@ -769,7 +846,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
} else {
object = Reflections.createInstance(
sourceSpec.getClassName(),
- this.functionClassLoader);
+ this.componentClassLoader);
}
}
@@ -783,7 +860,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
this.source = (Source<?>) object;
if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
- Thread.currentThread().setContextClassLoader(this.functionClassLoader);
+ Thread.currentThread().setContextClassLoader(this.componentClassLoader);
}
try {
if (sourceSpec.getConfigs().isEmpty()) {
@@ -847,17 +924,18 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
} else {
object = Reflections.createInstance(
sinkSpec.getClassName(),
- this.functionClassLoader);
+ this.componentClassLoader);
}
if (object instanceof Sink) {
this.sink = (Sink) object;
+ this.sinkTypeArg = TypeResolver.resolveRawArguments(Sink.class, object.getClass())[0];
} else {
throw new RuntimeException("Sink does not implement correct interface");
}
if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
- Thread.currentThread().setContextClassLoader(this.functionClassLoader);
+ Thread.currentThread().setContextClassLoader(this.componentClassLoader);
}
try {
if (sinkSpec.getConfigs().isEmpty()) {
@@ -881,4 +959,98 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
}
}
+
+ private static <T> Schema<T> getSinkSchema(Record<?> record, Class<T> clazz) {
+ SchemaType type = getSchemaTypeOrDefault(record, clazz);
+ switch (type) {
+ case NONE:
+ if (ByteBuffer.class.isAssignableFrom(clazz)) {
+ return (Schema<T>) Schema.BYTEBUFFER;
+ } else {
+ return (Schema<T>) Schema.BYTES;
+ }
+
+ case AUTO_CONSUME:
+ case AUTO:
+ return (Schema<T>) Schema.AUTO_CONSUME();
+
+ case STRING:
+ return (Schema<T>) Schema.STRING;
+
+ case AVRO:
+ return AvroSchema.of(SchemaDefinition.<T>builder()
+ .withPojo(clazz).build());
+
+ case JSON:
+ return JSONSchema.of(SchemaDefinition.<T>builder().withPojo(clazz).build());
+
+ case KEY_VALUE:
+ return (Schema<T>) Schema.KV_BYTES();
+
+ case PROTOBUF:
+ return ProtobufSchema.ofGenericClass(clazz, new HashMap<>());
+
+ case PROTOBUF_NATIVE:
+ return ProtobufNativeSchema.ofGenericClass(clazz, new HashMap<>());
+
+ case AUTO_PUBLISH:
+ return (Schema<T>) Schema.AUTO_PRODUCE_BYTES();
+
+ default:
+ throw new RuntimeException("Unsupported schema type" + type);
+ }
+ }
+
+ private static SchemaType getSchemaTypeOrDefault(Record<?> record, Class<?> clazz) {
+ if (GenericObject.class.isAssignableFrom(clazz)) {
+ return SchemaType.AUTO_CONSUME;
+ } else if (byte[].class.equals(clazz)
+ || ByteBuf.class.equals(clazz)
+ || ByteBuffer.class.equals(clazz)) {
+ // if sink uses bytes, we should ignore
+ return SchemaType.NONE;
+ } else {
+ Schema<?> schema = record.getSchema();
+ if (schema != null) {
+ if (schema.getSchemaInfo().getType() == SchemaType.NONE) {
+ return getDefaultSchemaType(clazz);
+ } else {
+ return schema.getSchemaInfo().getType();
+ }
+ } else {
+ return getDefaultSchemaType(clazz);
+ }
+ }
+ }
+
+ private static SchemaType getDefaultSchemaType(Class<?> clazz) {
+ if (byte[].class.equals(clazz)
+ || ByteBuf.class.equals(clazz)
+ || ByteBuffer.class.equals(clazz)) {
+ return SchemaType.NONE;
+ } else if (GenericObject.class.isAssignableFrom(clazz)) {
+ // the sink is taking generic record/object, so we do auto schema detection
+ return SchemaType.AUTO_CONSUME;
+ } else if (String.class.equals(clazz)) {
+ // If type is String, then we use schema type string, otherwise we fallback on default schema
+ return SchemaType.STRING;
+ } else if (isProtobufClass(clazz)) {
+ return SchemaType.PROTOBUF;
+ } else if (KeyValue.class.equals(clazz)) {
+ return SchemaType.KEY_VALUE;
+ } else {
+ return SchemaType.JSON;
+ }
+ }
+
+ private static boolean isProtobufClass(Class<?> pojoClazz) {
+ try {
+ Class<?> protobufBaseClass = Class.forName("com.google.protobuf.GeneratedMessageV3");
+ return protobufBaseClass.isAssignableFrom(pojoClazz);
+ } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ // If sink does not have protobuf in classpath then it cannot be protobuf
+ return false;
+ }
+ }
+
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java
index 5b63c6e8896..233100cd2ee 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java
@@ -31,10 +31,21 @@ import org.apache.pulsar.functions.api.Record;
public class OutputRecordSinkRecord<T> extends AbstractSinkRecord<T> {
private final Record<T> sinkRecord;
+ private final T value;
+ private final Schema<T> schema;
OutputRecordSinkRecord(Record<T> sourceRecord, Record<T> sinkRecord) {
super(sourceRecord);
this.sinkRecord = sinkRecord;
+ this.value = sinkRecord.getValue();
+ this.schema = getRecordSchema(sinkRecord);
+ }
+
+ OutputRecordSinkRecord(Record<T> sourceRecord, Record<T> sinkRecord, T value, Schema<T> schema) {
+ super(sourceRecord);
+ this.sinkRecord = sinkRecord;
+ this.value = value;
+ this.schema = schema;
}
@Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkSchemaInfoProvider.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkSchemaInfoProvider.java
new file mode 100644
index 00000000000..1bd41f226d1
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkSchemaInfoProvider.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.protocol.schema.SchemaHash;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+/**
+ * SchemaInfo provider that creates a new schema version for each new schema hash.
+ */
+class SinkSchemaInfoProvider implements SchemaInfoProvider {
+
+ AtomicLong latestVersion = new AtomicLong(0);
+ ConcurrentHashMap<SchemaVersion, SchemaInfo> schemaInfos = new ConcurrentHashMap<>();
+ ConcurrentHashMap<SchemaHash, SchemaVersion> schemaVersions = new ConcurrentHashMap<>();
+
+ /**
+ * Creates a new schema version with the info of the provided schema if the hash of the schema is a new one.
+ *
+ * @param schema schema for which we create a version
+ * @return the version of the schema
+ */
+ public SchemaVersion addSchemaIfNeeded(Schema<?> schema) {
+ SchemaHash schemaHash = SchemaHash.of(schema);
+ return schemaVersions.computeIfAbsent(schemaHash, s -> createNewSchemaInfo(schema.getSchemaInfo()));
+ }
+
+ private SchemaVersion createNewSchemaInfo(SchemaInfo schemaInfo) {
+ long l = latestVersion.incrementAndGet();
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ buffer.putLong(l);
+ BytesSchemaVersion schemaVersion = BytesSchemaVersion.of(buffer.array());
+ schemaInfos.put(schemaVersion, schemaInfo);
+ return schemaVersion;
+ }
+
+ @Override
+ public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion) {
+ return CompletableFuture.completedFuture(schemaInfos.get(BytesSchemaVersion.of(schemaVersion)));
+ }
+
+ @Override
+ public CompletableFuture<SchemaInfo> getLatestSchema() {
+ long l = latestVersion.get();
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ buffer.putLong(l);
+ SchemaVersion schemaVersion = BytesSchemaVersion.of(buffer.array());
+ return CompletableFuture.completedFuture(schemaInfos.get(schemaVersion));
+ }
+
+ @Override
+ public String getTopicName() {
+ return "__INTERNAL__";
+ }
+}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 5029a064311..0f8385fc6fa 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -79,7 +79,7 @@ public class JavaInstanceRunnableTest {
when(clientBuilder.build()).thenReturn(null);
InstanceConfig config = createInstanceConfig(functionDetails);
JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
- config, clientBuilder, null, null, null, null, null, null, null);
+ config, clientBuilder, null, null, null, null, null, null, null, null);
return javaInstanceRunnable;
}
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java
index 55adf848da5..512c583e301 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java
@@ -38,6 +38,7 @@ public class RecordFunction implements Function<String, Record<String>> {
return context.newOutputRecordBuilder(Schema.STRING)
.destinationTopic(publishTopic)
.value(output)
+ .schema(Schema.STRING)
.properties(properties)
.build();
}
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 2b9b25ed743..6c7551f8ac3 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -441,7 +441,7 @@ public class LocalRunner implements AutoCloseable {
if (builtInSinkClassLoader != null) {
functionDetails = SinkConfigUtils.convert(
sinkConfig, SinkConfigUtils.validateAndExtractDetails(
- sinkConfig, builtInSinkClassLoader, true));
+ sinkConfig, builtInSinkClassLoader, null, true));
userCodeClassLoader = builtInSinkClassLoader;
} else if (Utils.isFunctionPackageUrlSupported(userCodeFile)) {
File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
@@ -449,7 +449,8 @@ public class LocalRunner implements AutoCloseable {
Function.FunctionDetails.ComponentType.SINK,
sinkConfig.getClassName(), file, narExtractionDirectory);
functionDetails = SinkConfigUtils.convert(
- sinkConfig, SinkConfigUtils.validateAndExtractDetails(sinkConfig, sinkClassLoader, true));
+ sinkConfig,
+ SinkConfigUtils.validateAndExtractDetails(sinkConfig, sinkClassLoader, null, true));
userCodeClassLoader = sinkClassLoader;
userCodeClassLoaderCreated = true;
} else if (userCodeFile != null) {
@@ -461,7 +462,8 @@ public class LocalRunner implements AutoCloseable {
Function.FunctionDetails.ComponentType.SINK,
sinkConfig.getClassName(), file, narExtractionDirectory);
functionDetails = SinkConfigUtils.convert(
- sinkConfig, SinkConfigUtils.validateAndExtractDetails(sinkConfig, sinkClassLoader, true));
+ sinkConfig,
+ SinkConfigUtils.validateAndExtractDetails(sinkConfig, sinkClassLoader, null, true));
userCodeClassLoader = sinkClassLoader;
userCodeClassLoaderCreated = true;
} else {
@@ -470,7 +472,7 @@ public class LocalRunner implements AutoCloseable {
}
functionDetails = SinkConfigUtils.convert(
sinkConfig, SinkConfigUtils.validateAndExtractDetails(
- sinkConfig, Thread.currentThread().getContextClassLoader(), true));
+ sinkConfig, Thread.currentThread().getContextClassLoader(), null, true));
}
} else {
throw new IllegalArgumentException("Must specify Function, Source or Sink config");
@@ -576,6 +578,8 @@ public class LocalRunner implements AutoCloseable {
instanceConfig,
userCodeFile,
null,
+ null,
+ null,
runtimeFactory,
instanceLivenessCheck);
spawners.add(runtimeSpawner);
@@ -680,6 +684,8 @@ public class LocalRunner implements AutoCloseable {
instanceConfig,
userCodeFile,
null,
+ null,
+ null,
runtimeFactory,
instanceLivenessCheck);
spawners.add(runtimeSpawner);
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index 09ff9cfc8eb..e67899abd22 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -211,6 +211,7 @@ message FunctionMetaData {
uint64 createTime = 4;
map<int32, FunctionState> instanceStates = 5;
FunctionAuthenticationSpec functionAuthSpec = 6;
+ PackageLocationMetaData transformFunctionPackageLocation = 7;
}
message FunctionAuthenticationSpec {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index e65a8e254e2..3978e0f34fd 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -61,6 +61,12 @@ public class JavaInstanceStarter implements AutoCloseable {
listConverter = StringConverter.class)
public String jarFile;
+ @Parameter(
+ names = "--transform_function_jar",
+ description = "Path to Transform Function Jar\n",
+ listConverter = StringConverter.class)
+ public String transformFunctionJarFile;
+
@Parameter(names = "--instance_id", description = "Instance Id\n", required = true)
public int instanceId;
@@ -73,6 +79,9 @@ public class JavaInstanceStarter implements AutoCloseable {
@Parameter(names = "--pulsar_serviceurl", description = "Pulsar Service Url\n", required = true)
public String pulsarServiceUrl;
+ @Parameter(names = "--transform_function_id", description = "Transform Function Id\n")
+ public String transformFunctionId;
+
@Parameter(names = "--client_auth_plugin", description = "Client auth plugin name\n")
public String clientAuthenticationPlugin;
@@ -157,6 +166,7 @@ public class JavaInstanceStarter implements AutoCloseable {
InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionId(functionId);
+ instanceConfig.setTransformFunctionId(transformFunctionId);
instanceConfig.setFunctionVersion(functionVersion);
instanceConfig.setInstanceId(instanceId);
instanceConfig.setMaxBufferedTuples(maxBufferedTuples);
@@ -220,6 +230,8 @@ public class JavaInstanceStarter implements AutoCloseable {
instanceConfig,
jarFile,
null, // we really dont use this in thread container
+ transformFunctionJarFile,
+ null, // we really dont use this in thread container
containerFactory,
expectedHealthCheckInterval * 1000);
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
index 208420957cc..0ee2c430a72 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
@@ -52,6 +52,7 @@ public interface RuntimeFactory extends AutoCloseable {
*/
Runtime createContainer(
InstanceConfig instanceConfig, String codeFile, String originalCodeFileName,
+ String transformFunctionFile, String originalTransformFunctionFileName,
Long expectedHealthCheckInterval) throws Exception;
default boolean externallyManaged() {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index a2b7bf4c254..4f9e9b672cb 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -57,6 +57,8 @@ public class RuntimeSpawner implements AutoCloseable {
public RuntimeSpawner(InstanceConfig instanceConfig,
String codeFile,
String originalCodeFileName,
+ String transformFunctionFile,
+ String originalTransformFunctionFileName,
RuntimeFactory containerFactory, long instanceLivenessCheckFreqMs) {
this.instanceConfig = instanceConfig;
this.runtimeFactory = containerFactory;
@@ -65,6 +67,7 @@ public class RuntimeSpawner implements AutoCloseable {
this.instanceLivenessCheckFreqMs = instanceLivenessCheckFreqMs;
try {
this.runtime = runtimeFactory.createContainer(this.instanceConfig, codeFile, originalCodeFileName,
+ transformFunctionFile, originalTransformFunctionFileName,
instanceLivenessCheckFreqMs / 1000);
} catch (Exception e) {
throw new RuntimeException(e);
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index a1101e80c95..e5baa7ae967 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.runtime;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.util.JsonFormat;
import io.prometheus.client.hotspot.BufferPoolsExports;
@@ -36,6 +37,7 @@ import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.URL;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -66,6 +68,7 @@ public class RuntimeUtils {
String extraDependenciesDir, /* extra dependencies for running instances */
String logDirectory,
String originalCodeFileName,
+ String originalTransformFunctionFileName,
String pulsarServiceUrl,
String stateStorageServiceUrl,
AuthenticationConfig authConfig,
@@ -85,7 +88,7 @@ public class RuntimeUtils {
final List<String> cmd = getArgsBeforeCmd(instanceConfig, extraDependenciesDir);
cmd.addAll(getCmd(instanceConfig, instanceFile, extraDependenciesDir, logDirectory,
- originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl,
+ originalCodeFileName, originalTransformFunctionFileName, pulsarServiceUrl, stateStorageServiceUrl,
authConfig, shardId, grpcPort, expectedHealthCheckInterval,
logConfigFile, secretsProviderClassName, secretsProviderConfig,
installUserCodeDependencies, pythonDependencyRepository,
@@ -262,6 +265,7 @@ public class RuntimeUtils {
String extraDependenciesDir, /* extra dependencies for running instances */
String logDirectory,
String originalCodeFileName,
+ String originalTransformFunctionFileName,
String pulsarServiceUrl,
String stateStorageServiceUrl,
AuthenticationConfig authConfig,
@@ -332,9 +336,7 @@ public class RuntimeUtils {
}
if (!isEmpty(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
- for (String runtimeFlagArg : splitRuntimeArgs(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
- args.add(runtimeFlagArg);
- }
+ Collections.addAll(args, splitRuntimeArgs(instanceConfig.getFunctionDetails().getRuntimeFlags()));
}
if (instanceConfig.getFunctionDetails().getResources() != null) {
Function.Resources resources = instanceConfig.getFunctionDetails().getResources();
@@ -346,12 +348,16 @@ public class RuntimeUtils {
args.add("--jar");
args.add(originalCodeFileName);
+ if (isNotEmpty(originalTransformFunctionFileName)) {
+ args.add("--transform_function_jar");
+ args.add(originalTransformFunctionFileName);
+ args.add("--transform_function_id");
+ args.add(instanceConfig.getTransformFunctionId());
+ }
} else if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON) {
args.add("python3");
if (!isEmpty(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
- for (String runtimeFlagArg : splitRuntimeArgs(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
- args.add(runtimeFlagArg);
- }
+ Collections.addAll(args, splitRuntimeArgs(instanceConfig.getFunctionDetails().getRuntimeFlags()));
}
args.add(instanceFile);
args.add("--py");
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index 50dc975075f..f7791e716d9 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -22,6 +22,7 @@ package org.apache.pulsar.functions.runtime.kubernetes;
import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.commons.lang3.StringUtils.left;
import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
import static org.apache.pulsar.functions.utils.FunctionCommon.roundDecimal;
@@ -74,7 +75,6 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;
import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -139,6 +139,7 @@ public class KubernetesRuntime implements Runtime {
private final String configAdminCLI;
private final String userCodePkgUrl;
private final String originalCodeFileName;
+ private final String originalTransformFunctionFileName;
private final String pulsarAdminUrl;
private final SecretsProviderConfigurator secretsProviderConfigurator;
private int percentMemoryPadding;
@@ -173,6 +174,7 @@ public class KubernetesRuntime implements Runtime {
String configAdminCLI,
String userCodePkgUrl,
String originalCodeFileName,
+ String originalTransformFunctionFileName,
String pulsarServiceUrl,
String pulsarAdminUrl,
String stateStorageServiceUrl,
@@ -203,8 +205,11 @@ public class KubernetesRuntime implements Runtime {
this.configAdminCLI = configAdminCLI;
this.userCodePkgUrl = userCodePkgUrl;
this.downloadDirectory =
- StringUtils.isNotEmpty(downloadDirectory) ? downloadDirectory : this.pulsarRootDir; // for backward comp
+ isNotEmpty(downloadDirectory) ? downloadDirectory : this.pulsarRootDir; // for backward comp
this.originalCodeFileName = this.downloadDirectory + "/" + originalCodeFileName;
+ this.originalTransformFunctionFileName = isNotEmpty(originalTransformFunctionFileName)
+ ? this.downloadDirectory + "/" + originalTransformFunctionFileName
+ : originalTransformFunctionFileName;
this.pulsarAdminUrl = pulsarAdminUrl;
this.secretsProviderConfigurator = secretsProviderConfigurator;
this.percentMemoryPadding = percentMemoryPadding;
@@ -263,6 +268,7 @@ public class KubernetesRuntime implements Runtime {
extraDependenciesDir,
logDirectory,
this.originalCodeFileName,
+ this.originalTransformFunctionFileName,
pulsarServiceUrl,
stateStorageServiceUrl,
authConfig,
@@ -844,49 +850,29 @@ public class KubernetesRuntime implements Runtime {
}
protected List<String> getExecutorCommand() {
- return Arrays.asList(
- "sh",
- "-c",
- String.join(" ", getDownloadCommand(instanceConfig.getFunctionDetails(), originalCodeFileName))
- + " && " + setShardIdEnvironmentVariableCommand()
- + " && " + String.join(" ", processArgs)
- );
+ List<String> cmds =
+ new ArrayList<>(getDownloadCommand(instanceConfig.getFunctionDetails(), originalCodeFileName, false));
+ if (isNotEmpty(originalTransformFunctionFileName)) {
+ cmds.add("&&");
+ cmds.addAll(getDownloadCommand(instanceConfig.getFunctionDetails(),
+ originalTransformFunctionFileName, true));
+ }
+ cmds.add("&&");
+ cmds.add(setShardIdEnvironmentVariableCommand());
+ cmds.add("&&");
+ cmds.addAll(processArgs);
+ return Arrays.asList("sh", "-c", String.join(" ", cmds));
}
- private List<String> getDownloadCommand(Function.FunctionDetails functionDetails, String userCodeFilePath) {
+ private List<String> getDownloadCommand(Function.FunctionDetails functionDetails, String userCodeFilePath,
+ boolean transformFunction) {
return getDownloadCommand(functionDetails.getTenant(), functionDetails.getNamespace(),
- functionDetails.getName(), userCodeFilePath);
+ functionDetails.getName(), userCodeFilePath, transformFunction);
}
- private List<String> getDownloadCommand(String tenant, String namespace, String name, String userCodeFilePath) {
-
- // add auth plugin and parameters if necessary
- if (authenticationEnabled && authConfig != null) {
- if (isNotBlank(authConfig.getClientAuthenticationPlugin())
- && isNotBlank(authConfig.getClientAuthenticationParameters())
- && instanceConfig.getFunctionAuthenticationSpec() != null) {
- return Arrays.asList(
- pulsarRootDir + configAdminCLI,
- "--auth-plugin",
- authConfig.getClientAuthenticationPlugin(),
- "--auth-params",
- authConfig.getClientAuthenticationParameters(),
- "--admin-url",
- pulsarAdminUrl,
- "functions",
- "download",
- "--tenant",
- tenant,
- "--namespace",
- namespace,
- "--name",
- name,
- "--destination-file",
- userCodeFilePath);
- }
- }
-
- return Arrays.asList(
+ private List<String> getDownloadCommand(String tenant, String namespace, String name, String userCodeFilePath,
+ boolean transformFunction) {
+ ArrayList<String> cmd = new ArrayList<>(Arrays.asList(
pulsarRootDir + configAdminCLI,
"--admin-url",
pulsarAdminUrl,
@@ -899,7 +885,26 @@ public class KubernetesRuntime implements Runtime {
"--name",
name,
"--destination-file",
- userCodeFilePath);
+ userCodeFilePath));
+
+ // add auth plugin and parameters if necessary
+ if (authenticationEnabled && authConfig != null) {
+ if (isNotBlank(authConfig.getClientAuthenticationPlugin())
+ && isNotBlank(authConfig.getClientAuthenticationParameters())
+ && instanceConfig.getFunctionAuthenticationSpec() != null) {
+ cmd.addAll(Arrays.asList(
+ "--auth-plugin",
+ authConfig.getClientAuthenticationPlugin(),
+ "--auth-params",
+ authConfig.getClientAuthenticationParameters()));
+ }
+ }
+
+ if (transformFunction) {
+ cmd.add("--transform-function");
+ }
+
+ return cmd;
}
private static String setShardIdEnvironmentVariableCommand() {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index aa000e789c2..740d6cc0fdc 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -269,6 +269,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
@Override
public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String codePkgUrl,
String originalCodeFileName,
+ String transformFunctionPkgUrl,
+ String originalTransformFunctionFileName,
Long expectedHealthCheckInterval) throws Exception {
String instanceFile = null;
switch (instanceConfig.getFunctionDetails().getRuntime()) {
@@ -327,6 +329,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
configAdminCLI,
codePkgUrl,
originalCodeFileName,
+ originalTransformFunctionFileName,
pulsarServiceUrl,
pulsarAdminUrl,
stateStorageServiceUri,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
index 3c17633ea61..678296eeec0 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
@@ -82,6 +82,7 @@ class ProcessRuntime implements Runtime {
String narExtractionDirectory,
String logDirectory,
String codeFile,
+ String transformFunctionFile,
String pulsarServiceUrl,
String stateStorageServiceUrl,
AuthenticationConfig authConfig,
@@ -133,6 +134,7 @@ class ProcessRuntime implements Runtime {
? extraDependenciesDir : null,
logDirectory,
codeFile,
+ transformFunctionFile,
pulsarServiceUrl,
stateStorageServiceUrl,
authConfig,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
index 36e09bafd34..2324d0ee441 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
@@ -193,6 +193,8 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
@Override
public ProcessRuntime createContainer(InstanceConfig instanceConfig, String codeFile,
String originalCodeFileName,
+ String transformFunctionFile,
+ String originalTransformFunctionFileName,
Long expectedHealthCheckInterval) throws Exception {
String instanceFile = null;
switch (instanceConfig.getFunctionDetails().getRuntime()) {
@@ -223,6 +225,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
narExtractionDirectory,
logDirectory,
codeFile,
+ transformFunctionFile,
pulsarServiceUrl,
stateStorageServiceUrl,
authConfig,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index f2aa1ca6f98..d1b53673209 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -64,6 +64,7 @@ public class ThreadRuntime implements Runtime {
private final ThreadGroup threadGroup;
private final FunctionCacheManager fnCache;
private final String jarFile;
+ private final String transformFunctionFile;
private final ClientBuilder clientBuilder;
private final PulsarClient pulsarClient;
private final PulsarAdmin pulsarAdmin;
@@ -79,6 +80,7 @@ public class ThreadRuntime implements Runtime {
FunctionCacheManager fnCache,
ThreadGroup threadGroup,
String jarFile,
+ String transformFunctionFile,
PulsarClient client,
ClientBuilder clientBuilder,
PulsarAdmin pulsarAdmin,
@@ -97,6 +99,7 @@ public class ThreadRuntime implements Runtime {
this.threadGroup = threadGroup;
this.fnCache = fnCache;
this.jarFile = jarFile;
+ this.transformFunctionFile = transformFunctionFile;
this.clientBuilder = clientBuilder;
this.pulsarClient = client;
this.pulsarAdmin = pulsarAdmin;
@@ -110,14 +113,15 @@ public class ThreadRuntime implements Runtime {
}
private static ClassLoader getFunctionClassLoader(InstanceConfig instanceConfig,
+ String functionId,
String jarFile,
String narExtractionDirectory,
FunctionCacheManager fnCache,
Optional<ConnectorsManager> connectorsManager,
- Optional<FunctionsManager> functionsManager) throws Exception {
- if (FunctionCommon.isFunctionCodeBuiltin(instanceConfig.getFunctionDetails())) {
- Function.FunctionDetails.ComponentType componentType =
- InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails());
+ Optional<FunctionsManager> functionsManager,
+ Function.FunctionDetails.ComponentType componentType)
+ throws Exception {
+ if (FunctionCommon.isFunctionCodeBuiltin(instanceConfig.getFunctionDetails(), componentType)) {
if (componentType == Function.FunctionDetails.ComponentType.FUNCTION && functionsManager.isPresent()) {
return functionsManager.get()
.getFunction(instanceConfig.getFunctionDetails().getBuiltin())
@@ -134,11 +138,12 @@ public class ThreadRuntime implements Runtime {
.getClassLoader();
}
}
- return loadJars(jarFile, instanceConfig, narExtractionDirectory, fnCache);
+ return loadJars(jarFile, instanceConfig, functionId, narExtractionDirectory, fnCache);
}
private static ClassLoader loadJars(String jarFile,
InstanceConfig instanceConfig,
+ String functionId,
String narExtractionDirectory,
FunctionCacheManager fnCache) throws Exception {
if (jarFile == null) {
@@ -151,7 +156,7 @@ public class ThreadRuntime implements Runtime {
log.info("Trying Loading file as NAR file: {}", jarFile);
// Let's first try to treat it as a nar archive
fnCache.registerFunctionInstanceWithArchive(
- instanceConfig.getFunctionId(),
+ functionId,
instanceConfig.getInstanceName(),
jarFile, narExtractionDirectory);
loadedAsNar = true;
@@ -165,16 +170,16 @@ public class ThreadRuntime implements Runtime {
log.info("Load file as simple JAR file: {}", jarFile);
// create the function class loader
fnCache.registerFunctionInstance(
- instanceConfig.getFunctionId(),
+ functionId,
instanceConfig.getInstanceName(),
Arrays.asList(jarFile),
Collections.emptyList());
}
log.info("Initialize function class loader for function {} at function cache manager, functionClassLoader: {}",
- instanceConfig.getFunctionDetails().getName(), fnCache.getClassLoader(instanceConfig.getFunctionId()));
+ instanceConfig.getFunctionDetails().getName(), fnCache.getClassLoader(functionId));
- fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
+ fnClassLoader = fnCache.getClassLoader(functionId);
if (null == fnClassLoader) {
throw new Exception("No function class loader available.");
}
@@ -190,8 +195,13 @@ public class ThreadRuntime implements Runtime {
// extract class loader for function
ClassLoader functionClassLoader =
- getFunctionClassLoader(instanceConfig, jarFile, narExtractionDirectory, fnCache, connectorsManager,
- functionsManager);
+ getFunctionClassLoader(instanceConfig, instanceConfig.getFunctionId(), jarFile, narExtractionDirectory,
+ fnCache, connectorsManager, functionsManager,
+ InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails()));
+
+ ClassLoader transformFunctionClassLoader = transformFunctionFile == null ? null : getFunctionClassLoader(
+ instanceConfig, instanceConfig.getTransformFunctionId(), transformFunctionFile, narExtractionDirectory,
+ fnCache, connectorsManager, functionsManager, Function.FunctionDetails.ComponentType.FUNCTION);
// re-initialize JavaInstanceRunnable so that variables in constructor can be re-initialized
this.javaInstanceRunnable = new JavaInstanceRunnable(
@@ -203,7 +213,8 @@ public class ThreadRuntime implements Runtime {
stateStorageServiceUrl,
secretsProvider,
collectorRegistry,
- functionClassLoader);
+ functionClassLoader,
+ transformFunctionClassLoader);
log.info("ThreadContainer starting function with instanceId {} functionId {} namespace {}",
instanceConfig.getInstanceId(),
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index da8a9bb14dd..64ea2d0f172 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -177,6 +177,8 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
@Override
public ThreadRuntime createContainer(InstanceConfig instanceConfig, String jarFile,
String originalCodeFileName,
+ String transformFunctionFile,
+ String originalTransformFunctionFileName,
Long expectedHealthCheckInterval) {
SecretsProvider secretsProvider = defaultSecretsProvider;
if (secretsProvider == null) {
@@ -196,6 +198,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
fnCache,
threadGroup,
jarFile,
+ transformFunctionFile,
pulsarClient,
clientBuilder,
pulsarAdmin,
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
index 1651a7ba186..f7cdf707d32 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
@@ -191,6 +191,7 @@ public class RuntimeUtilsTest {
"extraDependenciesDir", /* extra dependencies for running instances */
"logDirectory",
"originalCodeFileName",
+ "originalExtraFileName",
"pulsarServiceUrl",
"stateStorageServiceUrl",
AuthenticationConfig.builder().build(),
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index df5c369303c..33451a316e4 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -284,6 +284,7 @@ public class KubernetesRuntimeTest {
config.setFunctionDetails(createFunctionDetails(runtime, addSecrets));
config.setFunctionId(java.util.UUID.randomUUID().toString());
+ config.setTransformFunctionId(java.util.UUID.randomUUID().toString());
config.setFunctionVersion("1.0");
config.setInstanceId(0);
config.setMaxBufferedTuples(1024);
@@ -304,7 +305,7 @@ public class KubernetesRuntimeTest {
factory = createKubernetesRuntimeFactory(null, percentMemoryPadding, 1.0, 1.0);
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, true);
- KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+ KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
Function.Resources resources = Function.Resources.newBuilder().setRam(ram).build();
@@ -364,7 +365,7 @@ public class KubernetesRuntimeTest {
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
factory = createKubernetesRuntimeFactory(null, 10, cpuOverCommitRatio, memoryOverCommitRatio);
- KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+ KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
List<String> args = container.getProcessArgs();
// check padding and xmx
@@ -387,7 +388,7 @@ public class KubernetesRuntimeTest {
List<String> args;
try (MockedStatic<SystemUtils> systemUtils = Mockito.mockStatic(SystemUtils.class, Mockito.CALLS_REAL_METHODS)) {
systemUtils.when(() -> SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)).thenReturn(true);
- container = factory.createContainer(config, userJarFile, userJarFile, 30L);
+ container = factory.createContainer(config, userJarFile, userJarFile, userJarFile, userJarFile,30L);
args = container.getProcessArgs();
}
@@ -400,14 +401,14 @@ public class KubernetesRuntimeTest {
if (null != depsDir) {
extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir;
classpath = classpath + ":" + depsDir + "/*";
- totalArgs = 42;
- portArg = 29;
- metricsPortArg = 31;
+ totalArgs = 46;
+ portArg = 33;
+ metricsPortArg = 35;
} else {
extraDepsEnv = "";
- portArg = 28;
- metricsPortArg = 30;
- totalArgs = 41;
+ portArg = 32;
+ metricsPortArg = 34;
+ totalArgs = 45;
}
if (secretsAttached) {
totalArgs += 4;
@@ -441,8 +442,11 @@ public class KubernetesRuntimeTest {
+ " --add-opens java.base/sun.net=ALL-UNNAMED"
+ " -Xmx" + RESOURCES.getRam()
+ " org.apache.pulsar.functions.instance.JavaInstanceMain"
- + " --jar " + jarLocation + " --instance_id "
- + "$SHARD_ID" + " --function_id " + config.getFunctionId()
+ + " --jar " + jarLocation
+ + " --transform_function_jar " + jarLocation
+ + " --transform_function_id " + config.getTransformFunctionId()
+ + " --instance_id " + "$SHARD_ID"
+ + " --function_id " + config.getFunctionId()
+ " --function_version " + config.getFunctionVersion()
+ " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+ "' --pulsar_serviceurl " + pulsarServiceUrl
@@ -491,7 +495,7 @@ public class KubernetesRuntimeTest {
}
private void verifyPythonInstance(InstanceConfig config, String extraDepsDir, boolean secretsAttached) throws Exception {
- KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+ KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
List<String> args = container.getProcessArgs();
int totalArgs;
@@ -582,7 +586,7 @@ public class KubernetesRuntimeTest {
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
config.setFunctionDetails(functionDetails);
- KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+ KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
V1StatefulSet spec = container.createStatefulSet();
assertEquals(spec.getMetadata().getLabels().get("tenant"), "tenant");
@@ -741,7 +745,7 @@ public class KubernetesRuntimeTest {
factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0);
verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false);
- KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+ KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
V1Service serviceSpec = container.createService();
assertEquals(serviceSpec.getMetadata().getNamespace(), "default");
@@ -760,7 +764,7 @@ public class KubernetesRuntimeTest {
factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0, Optional.of(new BasicKubernetesManifestCustomizer()));
verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false);
- KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+ KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
V1StatefulSet spec = container.createStatefulSet();
assertEquals(spec.getMetadata().getAnnotations().get("annotation"), "test");
assertEquals(spec.getMetadata().getLabels().get("label"), "test");
@@ -801,7 +805,7 @@ public class KubernetesRuntimeTest {
factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0, Optional.of(new TestKubernetesCustomManifestCustomizer()));
verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false);
- KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+ KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
V1StatefulSet spec = container.createStatefulSet();
assertEquals(spec.getSpec().getTemplate().getSpec().getServiceAccountName(), "my-service-account");
}
@@ -830,7 +834,7 @@ public class KubernetesRuntimeTest {
}
private void verifyGolangInstance(InstanceConfig config) throws Exception {
- KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+ KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
List<String> args = container.getProcessArgs();
int totalArgs = 8;
@@ -1007,7 +1011,7 @@ public class KubernetesRuntimeTest {
"org.apache.pulsar.functions.runtime.kubernetes.BasicKubernetesManifestCustomizer", configs);
verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false);
- KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+ KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
V1StatefulSet spec = container.createStatefulSet();
assertEquals(spec.getMetadata().getAnnotations().get("annotation"), "test");
assertEquals(spec.getMetadata().getLabels().get("label"), "test");
@@ -1055,7 +1059,7 @@ public class KubernetesRuntimeTest {
"org.apache.pulsar.functions.runtime.kubernetes.BasicKubernetesManifestCustomizer", configs);
verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false);
- KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+ KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
V1StatefulSet spec = container.createStatefulSet();
assertEquals(spec.getMetadata().getAnnotations().get("annotation"), "test");
assertEquals(spec.getMetadata().getLabels().get("label"), "test");
@@ -1141,7 +1145,7 @@ public class KubernetesRuntimeTest {
new DefaultSecretsProviderConfigurator(), Mockito.mock(ConnectorsManager.class),
Mockito.mock(FunctionsManager.class), Optional.empty(), Optional.empty());
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, true);
- KubernetesRuntime container = kubernetesRuntimeFactory.createContainer(config, userJarFile, userJarFile, 30l);
+ KubernetesRuntime container = kubernetesRuntimeFactory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
V1PodTemplateSpec template = container.createStatefulSet().getSpec().getTemplate();
Map<String, String> annotations =
template.getMetadata().getAnnotations();
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
index 6d80ec871b6..ea1d6bbc629 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
@@ -289,16 +289,16 @@ public class ProcessRuntimeTest {
List<String> args;
try (MockedStatic<SystemUtils> systemUtils = Mockito.mockStatic(SystemUtils.class, Mockito.CALLS_REAL_METHODS)) {
systemUtils.when(() -> SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)).thenReturn(true);
- ProcessRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30L);
+ ProcessRuntime container = factory.createContainer(config, userJarFile, userJarFile,
+ userJarFile, userJarFile,30L);
args = container.getProcessArgs();
}
-
String classpath = javaInstanceJarFile;
String extraDepsEnv;
int portArg;
int metricsPortArg;
- int totalArgCount = 44;
+ int totalArgCount = 48;
if (webServiceUrl != null && config.isExposePulsarAdminClientEnabled()) {
totalArgCount += 3;
}
@@ -306,13 +306,13 @@ public class ProcessRuntimeTest {
assertEquals(args.size(), totalArgCount);
extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir;
classpath = classpath + ":" + depsDir + "/*";
- portArg = 27;
- metricsPortArg = 29;
+ portArg = 31;
+ metricsPortArg = 33;
} else {
assertEquals(args.size(), totalArgCount-1);
extraDepsEnv = "";
- portArg = 26;
- metricsPortArg = 28;
+ portArg = 30;
+ metricsPortArg = 32;
}
if (webServiceUrl != null && config.isExposePulsarAdminClientEnabled()) {
portArg += 3;
@@ -331,8 +331,11 @@ public class ProcessRuntimeTest {
+ " -Dio.netty.tryReflectionSetAccessible=true"
+ " --add-opens java.base/sun.net=ALL-UNNAMED"
+ " org.apache.pulsar.functions.instance.JavaInstanceMain"
- + " --jar " + userJarFile + " --instance_id "
- + config.getInstanceId() + " --function_id " + config.getFunctionId()
+ + " --jar " + userJarFile
+ + " --transform_function_jar " + userJarFile
+ + " --transform_function_id " + config.getTransformFunctionId()
+ + " --instance_id " + config.getInstanceId()
+ + " --function_id " + config.getFunctionId()
+ " --function_version " + config.getFunctionVersion()
+ " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+ "' --pulsar_serviceurl " + pulsarServiceUrl
@@ -368,7 +371,7 @@ public class ProcessRuntimeTest {
}
private void verifyPythonInstance(InstanceConfig config, String extraDepsDir) throws Exception {
- ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l);
+ ProcessRuntime container = factory.createContainer(config, userJarFile, null, null, null,30l);
List<String> args = container.getProcessArgs();
int totalArgs = 36;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 49b71afcad7..54b23d5200b 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -56,6 +56,7 @@ import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.WindowFunction;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType;
import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
@@ -103,42 +104,50 @@ public class FunctionCommon {
return getFunctionTypes(functionClass, isWindowConfigPresent);
}
- public static Class<?>[] getFunctionTypes(Class userClass, boolean isWindowConfigPresent) {
- Class<?>[] typeArgs;
+ public static Class<?>[] getFunctionTypes(Class<?> userClass, boolean isWindowConfigPresent) {
+ Class<?> classParent = getFunctionClassParent(userClass, isWindowConfigPresent);
+ Class<?>[] typeArgs = TypeResolver.resolveRawArguments(classParent, userClass);
// if window function
if (isWindowConfigPresent) {
- if (WindowFunction.class.isAssignableFrom(userClass)) {
- typeArgs = getFunctionTypesUnwrappingRecordIfNeeded(WindowFunction.class, userClass);
- } else {
- typeArgs = getFunctionTypesUnwrappingRecordIfNeeded(java.util.function.Function.class, userClass);
+ if (classParent.equals(java.util.function.Function.class)) {
if (!typeArgs[0].equals(Collection.class)) {
throw new IllegalArgumentException("Window function must take a collection as input");
}
- Type type = TypeResolver.resolveGenericType(java.util.function.Function.class, userClass);
- Type collectionType = ((ParameterizedType) type).getActualTypeArguments()[0];
- Type actualInputType = ((ParameterizedType) collectionType).getActualTypeArguments()[0];
- typeArgs[0] = (Class<?>) actualInputType;
+ typeArgs[0] = (Class<?>) unwrapType(classParent, userClass, 0);
+ }
+ }
+ if (typeArgs[1].equals(Record.class)) {
+ typeArgs[1] = (Class<?>) unwrapType(classParent, userClass, 1);
+ }
+
+ return typeArgs;
+ }
+
+ public static Class<?>[] getRawFunctionTypes(Class<?> userClass, boolean isWindowConfigPresent) {
+ Class<?> classParent = getFunctionClassParent(userClass, isWindowConfigPresent);
+ return TypeResolver.resolveRawArguments(classParent, userClass);
+ }
+
+ public static Class<?> getFunctionClassParent(Class<?> userClass, boolean isWindowConfigPresent) {
+ if (isWindowConfigPresent) {
+ if (WindowFunction.class.isAssignableFrom(userClass)) {
+ return WindowFunction.class;
+ } else {
+ return java.util.function.Function.class;
}
} else {
if (Function.class.isAssignableFrom(userClass)) {
- typeArgs = getFunctionTypesUnwrappingRecordIfNeeded(Function.class, userClass);
+ return Function.class;
} else {
- typeArgs = getFunctionTypesUnwrappingRecordIfNeeded(java.util.function.Function.class, userClass);
+ return java.util.function.Function.class;
}
}
-
- return typeArgs;
}
- private static Class<?>[] getFunctionTypesUnwrappingRecordIfNeeded(Class<?> type, Class<?> subType) {
- Class<?>[] typeArgs = TypeResolver.resolveRawArguments(type, subType);
- if (typeArgs[1].equals(Record.class)) {
- Type genericType = TypeResolver.resolveGenericType(type, subType);
- Type recordType = ((ParameterizedType) genericType).getActualTypeArguments()[1];
- Type actualInputType = ((ParameterizedType) recordType).getActualTypeArguments()[0];
- typeArgs[1] = (Class<?>) actualInputType;
- }
- return typeArgs;
+ private static Type unwrapType(Class<?> type, Class<?> subType, int position) {
+ Type genericType = TypeResolver.resolveGenericType(type, subType);
+ Type argType = ((ParameterizedType) genericType).getActualTypeArguments()[position];
+ return ((ParameterizedType) argType).getActualTypeArguments()[0];
}
public static Object createInstance(String userClassName, ClassLoader classLoader) {
@@ -401,7 +410,7 @@ public class FunctionCommon {
}
public static ClassLoader getClassLoaderFromPackage(
- org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType componentType,
+ ComponentType componentType,
String className,
File packageFile,
String narExtractionDirectory) {
@@ -437,11 +446,9 @@ public class FunctionCommon {
narClassLoaderException);
}
try {
- if (componentType
- == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.FUNCTION) {
- connectorClassName = FunctionUtils.getFunctionClass((NarClassLoader) narClassLoader);
- } else if (componentType
- == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
+ if (componentType == ComponentType.FUNCTION) {
+ connectorClassName = FunctionUtils.getFunctionClass(narClassLoader);
+ } else if (componentType == ComponentType.SOURCE) {
connectorClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader);
} else {
connectorClassName = ConnectorUtils.getIOSinkClass((NarClassLoader) narClassLoader);
@@ -532,26 +539,28 @@ public class FunctionCommon {
}
public static boolean isFunctionCodeBuiltin(
- org.apache.pulsar.functions.proto.Function.FunctionDetailsOrBuilder functionDetails) {
- if (functionDetails.hasSource()) {
+ org.apache.pulsar.functions.proto.Function.FunctionDetailsOrBuilder functionDetail) {
+ return isFunctionCodeBuiltin(functionDetail, functionDetail.getComponentType());
+ }
+
+ public static boolean isFunctionCodeBuiltin(
+ org.apache.pulsar.functions.proto.Function.FunctionDetailsOrBuilder functionDetails,
+ ComponentType componentType) {
+ if (componentType == ComponentType.SOURCE && functionDetails.hasSource()) {
org.apache.pulsar.functions.proto.Function.SourceSpec sourceSpec = functionDetails.getSource();
if (!isEmpty(sourceSpec.getBuiltin())) {
return true;
}
}
- if (functionDetails.hasSink()) {
+ if (componentType == ComponentType.SINK && functionDetails.hasSink()) {
org.apache.pulsar.functions.proto.Function.SinkSpec sinkSpec = functionDetails.getSink();
if (!isEmpty(sinkSpec.getBuiltin())) {
return true;
}
}
- if (!isEmpty(functionDetails.getBuiltin())) {
- return true;
- }
-
- return false;
+ return componentType == ComponentType.FUNCTION && !isEmpty(functionDetails.getBuiltin());
}
public static SubscriptionInitialPosition convertFromFunctionDetailsSubscriptionPosition(
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index d24542185e5..bcd56b1d25f 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -562,7 +562,6 @@ public class FunctionConfigUtils {
} else if (functionConfig.getGo() != null) {
functionConfig.setRuntime(FunctionConfig.Runtime.GO);
}
-
WindowConfig windowConfig = functionConfig.getWindowConfig();
if (windowConfig != null) {
WindowConfigUtils.inferMissingArguments(windowConfig);
@@ -570,7 +569,7 @@ public class FunctionConfigUtils {
}
}
- private static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfig, ClassLoader clsLoader) {
+ public static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfig, ClassLoader clsLoader) {
String functionClassName = functionConfig.getClassName();
Class functionClass;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 31b147a1caf..2a06339d00a 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -22,6 +22,8 @@ import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;
import static org.apache.pulsar.functions.utils.FunctionCommon.convertProcessingGuarantee;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getFunctionTypes;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getRawFunctionTypes;
import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.gson.Gson;
@@ -50,9 +52,11 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.config.validation.ConfigValidation;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
@Slf4j
@@ -64,6 +68,7 @@ public class SinkConfigUtils {
public static class ExtractedSinkDetails {
private String sinkClassName;
private String typeArg;
+ private String functionClassName;
}
public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetails sinkDetails) throws IOException {
@@ -88,7 +93,14 @@ public class SinkConfigUtils {
} else {
functionDetailsBuilder.setParallelism(1);
}
- functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
+ if (sinkDetails.getFunctionClassName() != null) {
+ functionDetailsBuilder.setClassName(sinkDetails.getFunctionClassName());
+ } else {
+ functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
+ }
+ if (sinkConfig.getTransformFunctionConfig() != null) {
+ functionDetailsBuilder.setUserConfig(sinkConfig.getTransformFunctionConfig());
+ }
if (sinkConfig.getProcessingGuarantees() != null) {
functionDetailsBuilder.setProcessingGuarantees(
convertProcessingGuarantee(sinkConfig.getProcessingGuarantees()));
@@ -224,6 +236,11 @@ public class SinkConfigUtils {
sinkSpecBuilder.setBuiltin(builtin);
}
+ if (!isEmpty(sinkConfig.getTransformFunction())
+ && sinkConfig.getTransformFunction().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
+ functionDetailsBuilder.setBuiltin(sinkConfig.getTransformFunction().replaceFirst("^builtin://", ""));
+ }
+
if (sinkConfig.getConfigs() != null) {
sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfig.getConfigs()));
}
@@ -369,11 +386,23 @@ public class SinkConfigUtils {
}
}
+ if (!isEmpty(functionDetails.getBuiltin())) {
+ sinkConfig.setTransformFunction("builtin://" + functionDetails.getBuiltin());
+ }
+ if (!functionDetails.getClassName().equals(IdentityFunction.class.getName())) {
+ sinkConfig.setTransformFunctionClassName(functionDetails.getClassName());
+ }
+ if (!isEmpty(functionDetails.getUserConfig())) {
+ sinkConfig.setTransformFunctionConfig(functionDetails.getUserConfig());
+ }
+
+
return sinkConfig;
}
public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConfig,
ClassLoader sinkClassLoader,
+ ClassLoader functionClassLoader,
boolean validateConnectorConfig) {
if (isEmpty(sinkConfig.getTenant())) {
throw new IllegalArgumentException("Sink tenant cannot be null");
@@ -429,18 +458,47 @@ public class SinkConfigUtils {
String.format("Sink class %s not found in class loader", sinkClassName), e);
}
- // extract type from sink class
- Class<?> typeArg = getSinkType(sinkClass);
+ String functionClassName = sinkConfig.getTransformFunctionClassName();
+ Class<?> typeArg;
+ ClassLoader inputClassLoader;
+ if (functionClassLoader != null) {
+ // if function class name in sink config is not set, this should be a built-in function
+ // thus we should try to find it class name in the NAR service definition
+ if (functionClassName == null) {
+ try {
+ functionClassName = FunctionUtils.getFunctionClass(functionClassLoader);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to extract function class from archive", e);
+ }
+ }
+ Class functionClass;
+ try {
+ functionClass = functionClassLoader.loadClass(functionClassName);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(
+ String.format("Function class %s not found in class loader", functionClassName), e);
+ }
+ // extract type from transform function class
+ if (!getRawFunctionTypes(functionClass, false)[1].equals(Record.class)) {
+ throw new IllegalArgumentException("Sink transform function output must be of type Record");
+ }
+ typeArg = getFunctionTypes(functionClass, false)[0];
+ inputClassLoader = functionClassLoader;
+ } else {
+ // extract type from sink class
+ typeArg = getSinkType(sinkClass);
+ inputClassLoader = sinkClassLoader;
+ }
if (sinkConfig.getTopicToSerdeClassName() != null) {
for (String serdeClassName : sinkConfig.getTopicToSerdeClassName().values()) {
- ValidatorUtils.validateSerde(serdeClassName, typeArg, sinkClassLoader, true);
+ ValidatorUtils.validateSerde(serdeClassName, typeArg, inputClassLoader, true);
}
}
if (sinkConfig.getTopicToSchemaType() != null) {
for (String schemaType : sinkConfig.getTopicToSchemaType().values()) {
- ValidatorUtils.validateSchema(schemaType, typeArg, sinkClassLoader, true);
+ ValidatorUtils.validateSchema(schemaType, typeArg, inputClassLoader, true);
}
}
@@ -453,13 +511,13 @@ public class SinkConfigUtils {
throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
}
if (!isEmpty(consumerSpec.getSerdeClassName())) {
- ValidatorUtils.validateSerde(consumerSpec.getSerdeClassName(), typeArg, sinkClassLoader, true);
+ ValidatorUtils.validateSerde(consumerSpec.getSerdeClassName(), typeArg, inputClassLoader, true);
}
if (!isEmpty(consumerSpec.getSchemaType())) {
- ValidatorUtils.validateSchema(consumerSpec.getSchemaType(), typeArg, sinkClassLoader, true);
+ ValidatorUtils.validateSchema(consumerSpec.getSchemaType(), typeArg, inputClassLoader, true);
}
if (consumerSpec.getCryptoConfig() != null) {
- ValidatorUtils.validateCryptoKeyReader(consumerSpec.getCryptoConfig(), sinkClassLoader, false);
+ ValidatorUtils.validateCryptoKeyReader(consumerSpec.getCryptoConfig(), inputClassLoader, false);
}
}
}
@@ -469,7 +527,7 @@ public class SinkConfigUtils {
validateSinkConfig(sinkConfig, (NarClassLoader) sinkClassLoader);
}
- return new ExtractedSinkDetails(sinkClassName, typeArg.getName());
+ return new ExtractedSinkDetails(sinkClassName, typeArg.getName(), functionClassName);
}
private static Collection<String> collectAllInputTopics(SinkConfig sinkConfig) {
@@ -615,6 +673,15 @@ public class SinkConfigUtils {
if (newConfig.getCleanupSubscription() != null) {
mergedConfig.setCleanupSubscription(newConfig.getCleanupSubscription());
}
+ if (newConfig.getTransformFunction() != null) {
+ mergedConfig.setTransformFunction(newConfig.getTransformFunction());
+ }
+ if (newConfig.getTransformFunctionClassName() != null) {
+ mergedConfig.setTransformFunctionClassName(newConfig.getTransformFunctionClassName());
+ }
+ if (newConfig.getTransformFunctionConfig() != null) {
+ mergedConfig.setTransformFunctionConfig(newConfig.getTransformFunctionConfig());
+ }
return mergedConfig;
}
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index 60f7d30052e..4f316cac7c7 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -70,7 +70,7 @@ public class SinkConfigUtilsTest {
assertThrows(IllegalArgumentException.class, () -> {
SinkConfigUtils.convert(sinkConfig,
- new SinkConfigUtils.ExtractedSinkDetails(null, null));
+ new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
});
}
@@ -111,7 +111,10 @@ public class SinkConfigUtilsTest {
sinkConfig.setResources(Resources.getDefaultResources());
- Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
+ sinkConfig.setTransformFunction("builtin://transform");
+ sinkConfig.setTransformFunctionConfig("{\"key\": \"value\"}");
+
+ Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
assertEquals(Function.SubscriptionType.SHARED, functionDetails.getSource().getSubscriptionType());
SinkConfig convertedConfig = SinkConfigUtils.convertFromDetails(functionDetails);
assertEquals(
@@ -122,7 +125,7 @@ public class SinkConfigUtilsTest {
sinkConfig.setRetainOrdering(true);
sinkConfig.setRetainKeyOrdering(false);
- functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
+ functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
assertEquals(Function.SubscriptionType.FAILOVER, functionDetails.getSource().getSubscriptionType());
convertedConfig = SinkConfigUtils.convertFromDetails(functionDetails);
assertEquals(
@@ -132,7 +135,7 @@ public class SinkConfigUtilsTest {
sinkConfig.setRetainOrdering(false);
sinkConfig.setRetainKeyOrdering(true);
- functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
+ functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
assertEquals(Function.SubscriptionType.KEY_SHARED, functionDetails.getSource().getSubscriptionType());
convertedConfig = SinkConfigUtils.convertFromDetails(functionDetails);
assertEquals(
@@ -146,7 +149,7 @@ public class SinkConfigUtilsTest {
for (Boolean testcase : testcases) {
SinkConfig sinkConfig = createSinkConfig();
sinkConfig.setRetainOrdering(testcase);
- Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
+ Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
SinkConfig result = SinkConfigUtils.convertFromDetails(functionDetails);
assertEquals(result.getRetainOrdering(), testcase != null ? testcase : Boolean.valueOf(false));
}
@@ -158,7 +161,7 @@ public class SinkConfigUtilsTest {
for (Boolean testcase : testcases) {
SinkConfig sinkConfig = createSinkConfig();
sinkConfig.setRetainKeyOrdering(testcase);
- Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
+ Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
SinkConfig result = SinkConfigUtils.convertFromDetails(functionDetails);
assertEquals(result.getRetainKeyOrdering(), testcase != null ? testcase : Boolean.valueOf(false));
}
@@ -176,7 +179,7 @@ public class SinkConfigUtilsTest {
for (FunctionConfig.ProcessingGuarantees testcase : testcases) {
SinkConfig sinkConfig = createSinkConfig();
sinkConfig.setProcessingGuarantees(testcase);
- Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
+ Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
SinkConfig result = SinkConfigUtils.convertFromDetails(functionDetails);
assertEquals(result.getProcessingGuarantees(), testcase == null ? ATLEAST_ONCE : testcase);
}
@@ -189,7 +192,7 @@ public class SinkConfigUtilsTest {
for (Boolean testcase : testcases) {
SinkConfig sinkConfig = createSinkConfig();
sinkConfig.setCleanupSubscription(testcase);
- Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
+ Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
SinkConfig result = SinkConfigUtils.convertFromDetails(functionDetails);
assertEquals(result.getCleanupSubscription(), testcase == null ? Boolean.valueOf(true) : testcase);
}
@@ -425,6 +428,57 @@ public class SinkConfigUtilsTest {
);
}
+ @Test
+ public void testMergeDifferentTransformFunction() {
+ SinkConfig sinkConfig = createSinkConfig();
+ String newFunction = "builtin://new";
+ SinkConfig newSinkConfig = createUpdatedSinkConfig("transformFunction", newFunction);
+ SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
+ assertEquals(
+ mergedConfig.getTransformFunction(),
+ newFunction
+ );
+ mergedConfig.setTransformFunction(sinkConfig.getTransformFunction());
+ assertEquals(
+ new Gson().toJson(sinkConfig),
+ new Gson().toJson(mergedConfig)
+ );
+ }
+
+ @Test
+ public void testMergeDifferentTransformFunctionClassName() {
+ SinkConfig sinkConfig = createSinkConfig();
+ String newFunctionClassName = "NewTransformFunction";
+ SinkConfig newSinkConfig = createUpdatedSinkConfig("transformFunctionClassName", newFunctionClassName);
+ SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
+ assertEquals(
+ mergedConfig.getTransformFunctionClassName(),
+ newFunctionClassName
+ );
+ mergedConfig.setTransformFunctionClassName(sinkConfig.getTransformFunctionClassName());
+ assertEquals(
+ new Gson().toJson(sinkConfig),
+ new Gson().toJson(mergedConfig)
+ );
+ }
+
+ @Test
+ public void testMergeDifferentTransformFunctionConfig() {
+ SinkConfig sinkConfig = createSinkConfig();
+ String newFunctionConfig = "{\"new-key\": \"new-value\"}";
+ SinkConfig newSinkConfig = createUpdatedSinkConfig("transformFunctionConfig", newFunctionConfig);
+ SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
+ assertEquals(
+ mergedConfig.getTransformFunctionConfig(),
+ newFunctionConfig
+ );
+ mergedConfig.setTransformFunctionConfig(sinkConfig.getTransformFunctionConfig());
+ assertEquals(
+ new Gson().toJson(sinkConfig),
+ new Gson().toJson(mergedConfig)
+ );
+ }
+
@Test
public void testValidateConfig() {
SinkConfig sinkConfig = createSinkConfig();
@@ -459,6 +513,9 @@ public class SinkConfigUtilsTest {
sinkConfig.setCleanupSubscription(true);
sinkConfig.setArchive("DummyArchive.nar");
sinkConfig.setCleanupSubscription(true);
+ sinkConfig.setTransformFunction("builtin://transform");
+ sinkConfig.setTransformFunctionClassName("Transform");
+ sinkConfig.setTransformFunctionConfig("{\"key\": \"value\"}");
return sinkConfig;
}
@@ -478,7 +535,7 @@ public class SinkConfigUtilsTest {
@Test
public void testPoolMessages() throws IOException {
SinkConfig sinkConfig = createSinkConfig();
- Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
+ Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
assertFalse(functionDetails.getSource().getInputSpecsMap().get("test-input").getPoolMessages());
SinkConfig convertedConfig = SinkConfigUtils.convertFromDetails(functionDetails);
assertFalse(convertedConfig.getInputSpecs().get("test-input").isPoolMessages());
@@ -488,7 +545,7 @@ public class SinkConfigUtilsTest {
.poolMessages(true).build());
sinkConfig.setInputSpecs(inputSpecs);
- functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
+ functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
assertTrue(functionDetails.getSource().getInputSpecsMap().get("test-input").getPoolMessages());
convertedConfig = SinkConfigUtils.convertFromDetails(functionDetails);
assertTrue(convertedConfig.getInputSpecs().get("test-input").isPoolMessages());
@@ -499,13 +556,13 @@ public class SinkConfigUtilsTest {
SinkConfig sinkConfig = createSinkConfig();
sinkConfig.setInputSpecs(null);
sinkConfig.setTopicsPattern("my-topic-*");
- SinkConfigUtils.validateAndExtractDetails(sinkConfig, this.getClass().getClassLoader(),
+ SinkConfigUtils.validateAndExtractDetails(sinkConfig, this.getClass().getClassLoader(), null,
true);
sinkConfig.setTimeoutMs(null);
- SinkConfigUtils.validateAndExtractDetails(sinkConfig, this.getClass().getClassLoader(),
+ SinkConfigUtils.validateAndExtractDetails(sinkConfig, this.getClass().getClassLoader(), null,
true);
sinkConfig.setTimeoutMs(0L);
- SinkConfigUtils.validateAndExtractDetails(sinkConfig, this.getClass().getClassLoader(),
+ SinkConfigUtils.validateAndExtractDetails(sinkConfig, this.getClass().getClassLoader(), null,
true);
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 01b240d7f97..a4f8532dbc6 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.functions.worker;
import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.pulsar.common.functions.Utils.FILE;
import static org.apache.pulsar.common.functions.Utils.HTTP;
import static org.apache.pulsar.common.functions.Utils.hasPackageTypePrefix;
@@ -30,9 +31,9 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
@@ -106,37 +107,30 @@ public class FunctionActioner {
functionDetails.getName(), instanceId);
String packageFile;
+ String transformFunctionPackageFile = null;
- String pkgLocation = functionMetaData.getPackageLocation().getPackagePath();
- boolean isPkgUrlProvided = isFunctionPackageUrlSupported(pkgLocation);
+ Function.PackageLocationMetaData pkgLocation = functionMetaData.getPackageLocation();
+ Function.PackageLocationMetaData transformFunctionPkgLocation =
+ functionMetaData.getTransformFunctionPackageLocation();
if (runtimeFactory.externallyManaged()) {
- packageFile = pkgLocation;
+ packageFile = pkgLocation.getPackagePath();
+ transformFunctionPackageFile = transformFunctionPkgLocation.getPackagePath();
} else {
- if (isPkgUrlProvided && pkgLocation.startsWith(FILE)) {
- URL url = new URL(pkgLocation);
- File pkgFile = new File(url.toURI());
- packageFile = pkgFile.getAbsolutePath();
- } else if (FunctionCommon.isFunctionCodeBuiltin(functionDetails)) {
- File pkgFile = getBuiltinArchive(FunctionDetails.newBuilder(functionMetaData.getFunctionDetails()));
- packageFile = pkgFile.getAbsolutePath();
- } else {
- File pkgDir = new File(workerConfig.getDownloadDirectory(),
- getDownloadPackagePath(functionMetaData, instanceId));
- pkgDir.mkdirs();
- File pkgFile = new File(
- pkgDir,
- new File(getDownloadFileName(functionMetaData.getFunctionDetails(),
- functionMetaData.getPackageLocation())).getName());
- downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId);
- packageFile = pkgFile.getAbsolutePath();
+ packageFile = getPackageFile(functionMetaData, functionDetails, instanceId, pkgLocation,
+ InstanceUtils.calculateSubjectType(functionDetails));
+ if (!isEmpty(transformFunctionPkgLocation.getPackagePath())) {
+ transformFunctionPackageFile =
+ getPackageFile(functionMetaData, functionDetails, instanceId, transformFunctionPkgLocation,
+ FunctionDetails.ComponentType.FUNCTION);
}
}
// Setup for batch sources if necessary
setupBatchSource(functionDetails);
- RuntimeSpawner runtimeSpawner = getRuntimeSpawner(functionRuntimeInfo.getFunctionInstance(), packageFile);
+ RuntimeSpawner runtimeSpawner = getRuntimeSpawner(functionRuntimeInfo.getFunctionInstance(),
+ packageFile, transformFunctionPackageFile);
functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
runtimeSpawner.start();
@@ -149,7 +143,38 @@ public class FunctionActioner {
}
}
- RuntimeSpawner getRuntimeSpawner(Function.Instance instance, String packageFile) {
+ private String getPackageFile(FunctionMetaData functionMetaData, FunctionDetails functionDetails, int instanceId,
+ Function.PackageLocationMetaData pkgLocation,
+ FunctionDetails.ComponentType componentType)
+ throws URISyntaxException, IOException, ClassNotFoundException, PulsarAdminException {
+ String packagePath = pkgLocation.getPackagePath();
+ boolean isPkgUrlProvided = isFunctionPackageUrlSupported(packagePath);
+ String packageFile;
+ if (isPkgUrlProvided && packagePath.startsWith(FILE)) {
+ URL url = new URL(packagePath);
+ File pkgFile = new File(url.toURI());
+ packageFile = pkgFile.getAbsolutePath();
+ } else if (FunctionCommon.isFunctionCodeBuiltin(functionDetails, componentType)) {
+ File pkgFile = getBuiltinArchive(
+ componentType,
+ FunctionDetails.newBuilder(functionMetaData.getFunctionDetails()));
+ packageFile = pkgFile.getAbsolutePath();
+ } else {
+ File pkgDir = new File(workerConfig.getDownloadDirectory(),
+ getDownloadPackagePath(functionMetaData, instanceId));
+ pkgDir.mkdirs();
+ File pkgFile = new File(
+ pkgDir,
+ new File(getDownloadFileName(functionMetaData.getFunctionDetails(),
+ pkgLocation)).getName());
+ downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId, pkgLocation);
+ packageFile = pkgFile.getAbsolutePath();
+ }
+ return packageFile;
+ }
+
+ RuntimeSpawner getRuntimeSpawner(Function.Instance instance, String packageFile,
+ String transformFunctionPackageFile) {
FunctionMetaData functionMetaData = instance.getFunctionMetaData();
int instanceId = instance.getInstanceId();
@@ -170,6 +195,8 @@ public class FunctionActioner {
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, packageFile,
functionMetaData.getPackageLocation().getOriginalFileName(),
+ transformFunctionPackageFile,
+ functionMetaData.getTransformFunctionPackageLocation().getOriginalFileName(),
runtimeFactory, workerConfig.getInstanceLivenessCheckFreqMs());
return runtimeSpawner;
@@ -181,6 +208,7 @@ public class FunctionActioner {
instanceConfig.setFunctionDetails(functionDetails);
// TODO: set correct function id and version when features implemented
instanceConfig.setFunctionId(UUID.randomUUID().toString());
+ instanceConfig.setTransformFunctionId(UUID.randomUUID().toString());
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
instanceConfig.setInstanceId(instanceId);
instanceConfig.setMaxBufferedTuples(1024);
@@ -197,7 +225,8 @@ public class FunctionActioner {
}
private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaData functionMetaData,
- int instanceId) throws FileNotFoundException, IOException, PulsarAdminException {
+ int instanceId, Function.PackageLocationMetaData pkgLocation)
+ throws IOException, PulsarAdminException {
FunctionDetails details = functionMetaData.getFunctionDetails();
File pkgDir = pkgFile.getParentFile();
@@ -213,12 +242,12 @@ public class FunctionActioner {
pkgDir,
pkgFile.getName() + "." + instanceId + "." + UUID.randomUUID());
} while (tempPkgFile.exists() || !tempPkgFile.createNewFile());
- String pkgLocationPath = functionMetaData.getPackageLocation().getPackagePath();
+ String pkgLocationPath = pkgLocation.getPackagePath();
boolean downloadFromHttp = isPkgUrlProvided && pkgLocationPath.startsWith(HTTP);
boolean downloadFromPackageManagementService = isPkgUrlProvided && hasPackageTypePrefix(pkgLocationPath);
log.info("{}/{}/{} Function package file {} will be downloaded from {}", tempPkgFile, details.getTenant(),
details.getNamespace(), details.getName(),
- downloadFromHttp ? pkgLocationPath : functionMetaData.getPackageLocation());
+ downloadFromHttp ? pkgLocationPath : pkgLocation);
if (downloadFromHttp) {
FunctionCommon.downloadFromHttpUrl(pkgLocationPath, tempPkgFile);
@@ -247,7 +276,7 @@ public class FunctionActioner {
} catch (FileAlreadyExistsException faee) {
// file already exists
log.warn("Function package has been downloaded from {} and saved at {}",
- functionMetaData.getPackageLocation(), pkgFile);
+ pkgLocation, pkgFile);
}
} finally {
tempPkgFile.delete();
@@ -485,8 +514,9 @@ public class FunctionActioner {
File.separatorChar);
}
- private File getBuiltinArchive(FunctionDetails.Builder functionDetails) throws IOException, ClassNotFoundException {
- if (functionDetails.hasSource()) {
+ private File getBuiltinArchive(FunctionDetails.ComponentType componentType, FunctionDetails.Builder functionDetails)
+ throws IOException, ClassNotFoundException {
+ if (componentType == FunctionDetails.ComponentType.SOURCE && functionDetails.hasSource()) {
SourceSpec sourceSpec = functionDetails.getSource();
if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
Connector connector = connectorsManager.getConnector(sourceSpec.getBuiltin());
@@ -501,7 +531,7 @@ public class FunctionActioner {
}
}
- if (functionDetails.hasSink()) {
+ if (componentType == FunctionDetails.ComponentType.SINK && functionDetails.hasSink()) {
SinkSpec sinkSpec = functionDetails.getSink();
if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
Connector connector = connectorsManager.getConnector(sinkSpec.getBuiltin());
@@ -517,7 +547,8 @@ public class FunctionActioner {
}
}
- if (!StringUtils.isEmpty(functionDetails.getBuiltin())) {
+ if (componentType == FunctionDetails.ComponentType.FUNCTION
+ && !StringUtils.isEmpty(functionDetails.getBuiltin())) {
return functionsManager.getFunctionArchive(functionDetails.getBuiltin()).toFile();
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 006437cee3b..c05ffe9ca8c 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -758,7 +758,12 @@ public class FunctionRuntimeManager implements AutoCloseable {
newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
RuntimeSpawner runtimeSpawner = functionActioner.getRuntimeSpawner(
assignment.getInstance(),
- assignment.getInstance().getFunctionMetaData().getPackageLocation().getPackagePath());
+ assignment.getInstance().getFunctionMetaData().getPackageLocation().getPackagePath(),
+ assignment
+ .getInstance()
+ .getFunctionMetaData()
+ .getTransformFunctionPackageLocation()
+ .getPackagePath());
// re-initialize if necessary
runtimeSpawner.getRuntime().reinitialize();
newFunctionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index f3ebef89b92..132641c8f01 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -26,9 +26,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.functions.utils.FunctionCommon.createPkgTempFile;
import static org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace;
import static org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName;
-import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
-import static org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.downloadPackageFile;
import com.google.common.base.Utf8;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
@@ -42,6 +40,7 @@ import java.net.URI;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.Base64;
import java.util.Collection;
import java.util.LinkedList;
@@ -67,6 +66,7 @@ import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.FunctionsImpl;
@@ -292,16 +292,29 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionMetaData functionMetaData,
final String functionPkgUrl,
final FormDataContentDisposition fileDetail,
- final File uploadedInputStreamAsFile) throws Exception {
+ final File uploadedInputStreamAsFile)
+ throws Exception {
+ return getFunctionPackageLocation(functionMetaData, functionPkgUrl, fileDetail, uploadedInputStreamAsFile,
+ functionMetaData.getFunctionDetails().getName(), componentType,
+ getFunctionCodeBuiltin(functionMetaData.getFunctionDetails(), componentType));
+ }
+
+ PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionMetaData functionMetaData,
+ final String functionPkgUrl,
+ final FormDataContentDisposition fileDetail,
+ final File uploadedInputStreamAsFile,
+ final String componentName,
+ final FunctionDetails.ComponentType componentType,
+ final String builtin)
+ throws Exception {
FunctionDetails functionDetails = functionMetaData.getFunctionDetails();
String tenant = functionDetails.getTenant();
String namespace = functionDetails.getNamespace();
- String componentName = functionDetails.getName();
PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder();
- boolean isBuiltin = isFunctionCodeBuiltin(functionDetails);
boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
boolean isPackageManagementEnabled = worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement();
- PackageName packageName = PackageName.get(functionDetails.getComponentType().name(),
+ PackageName packageName = PackageName.get(
+ componentType.name(),
tenant,
namespace,
componentName,
@@ -311,7 +324,7 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
if (worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
// For externally managed schedulers, the pkgUrl/builtin stuff can be copied to bk
// if the function worker image does not include connectors
- if (isBuiltin) {
+ if (!isEmpty(builtin)) {
if (worker().getWorkerConfig().getUploadBuiltinSinksSources()) {
File component;
String archiveName;
@@ -328,7 +341,7 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
archiveName = functionDetails.getBuiltin();
component = worker().getFunctionsManager().getFunctionArchive(archiveName).toFile();
break;
- }
+ }
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
component.getName()));
packageLocationMetaDataBuilder.setOriginalFileName(component.getName());
@@ -337,6 +350,8 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
worker().getBrokerAdmin().packages().upload(metadata,
packageName.toString(), component.getAbsolutePath());
} else {
+ packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace,
+ componentName, component.getName()));
WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(),
component, worker().getDlogNamespace());
}
@@ -344,8 +359,7 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
packageLocationMetaDataBuilder.getPackagePath());
} else {
log.info("Skipping upload for the built-in package {}", ComponentTypeUtils.toString(componentType));
- packageLocationMetaDataBuilder
- .setPackagePath("builtin://" + getFunctionCodeBuiltin(functionDetails));
+ packageLocationMetaDataBuilder.setPackagePath("builtin://" + builtin);
}
} else if (isPkgUrlProvided) {
packageLocationMetaDataBuilder.setOriginalFileName(uploadedInputStreamAsFile.getName());
@@ -398,8 +412,8 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
}
} else {
// For pulsar managed schedulers, the pkgUrl/builtin stuff should be copied to bk
- if (isBuiltin) {
- packageLocationMetaDataBuilder.setPackagePath("builtin://" + getFunctionCodeBuiltin(functionDetails));
+ if (!isEmpty(builtin)) {
+ packageLocationMetaDataBuilder.setPackagePath("builtin://" + builtin);
} else if (isPkgUrlProvided) {
packageLocationMetaDataBuilder.setPackagePath(functionPkgUrl);
} else if (functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)
@@ -500,8 +514,19 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
String.format("Error deleting %s @ /%s/%s/%s",
ComponentTypeUtils.toString(componentType), tenant, namespace, componentName));
- // clean up component files stored in BK
- String functionPackagePath = functionMetaData.getPackageLocation().getPackagePath();
+ deleteComponentFromStorage(tenant, namespace, componentName,
+ functionMetaData.getPackageLocation().getPackagePath());
+
+ if (!isEmpty(functionMetaData.getTransformFunctionPackageLocation().getPackagePath())) {
+ deleteComponentFromStorage(tenant, namespace, componentName,
+ functionMetaData.getTransformFunctionPackageLocation().getPackagePath());
+ }
+
+ deleteStatestoreTableAsync(getStateNamespace(tenant, namespace), componentName);
+ }
+
+ private void deleteComponentFromStorage(String tenant, String namespace, String componentName,
+ String functionPackagePath) {
if (!functionPackagePath.startsWith(Utils.HTTP)
&& !functionPackagePath.startsWith(Utils.FILE)
&& !functionPackagePath.startsWith(Utils.BUILTIN)) {
@@ -509,22 +534,19 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
try {
worker().getBrokerAdmin().packages().delete(functionPackagePath);
} catch (PulsarAdminException e) {
- log.error("{}/{}/{} Failed to cleanup package in package managemanet with url {}", tenant,
- namespace, componentName, functionMetaData.getPackageLocation().getPackagePath(), e);
+ log.error("{}/{}/{} Failed to cleanup package in package management with url {}", tenant,
+ namespace, componentName, functionPackagePath, e);
}
} else {
try {
- WorkerUtils.deleteFromBookkeeper(worker().getDlogNamespace(),
- functionMetaData.getPackageLocation().getPackagePath());
+ WorkerUtils.deleteFromBookkeeper(worker().getDlogNamespace(), functionPackagePath);
} catch (IOException e) {
log.error("{}/{}/{} Failed to cleanup package in BK with path {}", tenant, namespace, componentName,
- functionMetaData.getPackageLocation().getPackagePath(), e);
+ functionPackagePath, e);
}
}
}
-
- deleteStatestoreTableAsync(getStateNamespace(tenant, namespace), componentName);
}
@Override
@@ -573,8 +595,7 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
throw new RestException(Status.NOT_FOUND,
String.format(ComponentTypeUtils.toString(componentType) + " %s doesn't exist", componentName));
}
- FunctionConfig config = FunctionConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
- return config;
+ return FunctionConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
}
@Override
@@ -1149,7 +1170,7 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
Reader<byte[]> reader = null;
Producer<byte[]> producer = null;
try {
- if (outputTopic != null && !outputTopic.isEmpty()) {
+ if (!isEmpty(outputTopic)) {
reader = worker().getClient().newReader()
.topic(outputTopic)
.startMessageId(MessageId.latest)
@@ -1420,7 +1441,8 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
@Override
public StreamingOutput downloadFunction(String tenant, String namespace, String componentName,
- String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
+ String clientRole, AuthenticationDataSource clientAuthenticationDataHttps,
+ boolean transformFunction) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
@@ -1445,14 +1467,17 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}
- String pkgPath = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName)
- .getPackageLocation().getPackagePath();
+ FunctionMetaData functionMetaData =
+ functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+ String pkgPath = transformFunction
+ ? functionMetaData.getTransformFunctionPackageLocation().getPackagePath()
+ : functionMetaData.getPackageLocation().getPackagePath();
return getStreamingOutput(pkgPath);
}
private StreamingOutput getStreamingOutput(String pkgPath) {
- final StreamingOutput streamingOutput = output -> {
+ return output -> {
if (pkgPath.startsWith(Utils.HTTP)) {
URL url = URI.create(pkgPath).toURL();
try (InputStream inputStream = url.openStream()) {
@@ -1493,7 +1518,6 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), output, pkgPath);
}
};
- return streamingOutput;
}
@Override
@@ -1605,23 +1629,26 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
}
}
- private String getFunctionCodeBuiltin(FunctionDetails functionDetails) {
- if (functionDetails.hasSource()) {
+ private String getFunctionCodeBuiltin(FunctionDetails functionDetails,
+ FunctionDetails.ComponentType componentType) {
+ if (componentType == FunctionDetails.ComponentType.SOURCE && functionDetails.hasSource()) {
SourceSpec sourceSpec = functionDetails.getSource();
if (!isEmpty(sourceSpec.getBuiltin())) {
return sourceSpec.getBuiltin();
}
}
- if (functionDetails.hasSink()) {
+ if (componentType == FunctionDetails.ComponentType.SINK && functionDetails.hasSink()) {
SinkSpec sinkSpec = functionDetails.getSink();
if (!isEmpty(sinkSpec.getBuiltin())) {
return sinkSpec.getBuiltin();
}
}
- if (!isEmpty(functionDetails.getBuiltin())) {
- return functionDetails.getBuiltin();
+ if (componentType == FunctionDetails.ComponentType.FUNCTION) {
+ if (!isEmpty(functionDetails.getBuiltin())) {
+ return functionDetails.getBuiltin();
+ }
}
return null;
@@ -1837,4 +1864,87 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
String narExtractionDirectory) {
return FunctionCommon.getClassLoaderFromPackage(componentType, className, packageFile, narExtractionDirectory);
}
+
+ static File downloadPackageFile(PulsarWorkerService worker, String packageName)
+ throws IOException, PulsarAdminException {
+ Path tempDirectory;
+ if (worker.getWorkerConfig().getDownloadDirectory() != null) {
+ tempDirectory = Paths.get(worker.getWorkerConfig().getDownloadDirectory());
+ } else {
+ // use the Nar extraction directory as a temporary directory for downloaded files
+ tempDirectory = Paths.get(worker.getWorkerConfig().getNarExtractionDirectory());
+ }
+ Files.createDirectories(tempDirectory);
+ File file = Files.createTempFile(tempDirectory, "function", ".tmp").toFile();
+ worker.getBrokerAdmin().packages().download(packageName, file.toString());
+ return file;
+ }
+
+ protected File getPackageFile(String functionPkgUrl, String existingPackagePath, InputStream uploadedInputStream)
+ throws IOException, PulsarAdminException {
+ File componentPackageFile = null;
+ if (isNotBlank(functionPkgUrl)) {
+ componentPackageFile = getPackageFile(functionPkgUrl);
+ } else if (existingPackagePath.startsWith(Utils.FILE) || existingPackagePath.startsWith(Utils.HTTP)) {
+ try {
+ componentPackageFile = FunctionCommon.extractFileFromPkgURL(existingPackagePath);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("Encountered error \"%s\" "
+ + "when getting %s package from %s", e.getMessage(),
+ ComponentTypeUtils.toString(componentType), functionPkgUrl));
+ }
+ } else if (uploadedInputStream != null) {
+ componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
+ } else if (!existingPackagePath.startsWith(Utils.BUILTIN)) {
+ componentPackageFile = FunctionCommon.createPkgTempFile();
+ componentPackageFile.deleteOnExit();
+ if (worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) {
+ worker().getBrokerAdmin().packages().download(
+ existingPackagePath,
+ componentPackageFile.getAbsolutePath());
+ } else {
+ WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(),
+ componentPackageFile, existingPackagePath);
+ }
+ }
+ return componentPackageFile;
+ }
+
+ protected File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
+ return downloadPackageFile(worker(), packageName);
+ }
+
+ protected File getPackageFile(String functionPkgUrl) throws IOException, PulsarAdminException {
+ if (Utils.hasPackageTypePrefix(functionPkgUrl)) {
+ return downloadPackageFile(functionPkgUrl);
+ } else {
+ if (!Utils.isFunctionPackageUrlSupported(functionPkgUrl)) {
+ throw new IllegalArgumentException("Function Package url is not valid."
+ + "supported url (http/https/file)");
+ }
+ try {
+ return FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("Encountered error \"%s\" "
+ + "when getting %s package from %s", e.getMessage(),
+ ComponentTypeUtils.toString(componentType), functionPkgUrl), e);
+ }
+ }
+ }
+
+ protected ClassLoader getBuiltinFunctionClassLoader(String archive) {
+ if (!StringUtils.isEmpty(archive)) {
+ if (archive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
+ archive = archive.replaceFirst("^builtin://", "");
+
+ FunctionArchive function = worker().getFunctionsManager().getFunction(archive);
+ // check if builtin connector exists
+ if (function == null) {
+ throw new IllegalArgumentException("Built-in " + componentType + " is not available");
+ }
+ return function.getClassLoader();
+ }
+ }
+ return null;
+ }
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 3e7f0ab7339..799de49d3cc 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -28,9 +28,6 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
@@ -57,7 +54,6 @@ import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.ComponentTypeUtils;
-import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
@@ -149,29 +145,13 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
}
Function.FunctionDetails functionDetails;
- boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
File componentPackageFile = null;
try {
// validate parameters
try {
- if (isPkgUrlProvided) {
- if (Utils.hasPackageTypePrefix(functionPkgUrl)) {
- componentPackageFile = downloadPackageFile(functionPkgUrl);
- } else {
- if (!Utils.isFunctionPackageUrlSupported(functionPkgUrl)) {
- throw new IllegalArgumentException("Function Package url is not valid."
- + "supported url (http/https/file)");
- }
- try {
-
- componentPackageFile = FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
- } catch (Exception e) {
- throw new IllegalArgumentException(String.format("Encountered error \"%s\" "
- + "when getting %s package from %s", e.getMessage(),
- ComponentTypeUtils.toString(componentType), functionPkgUrl), e);
- }
- }
+ if (isNotBlank(functionPkgUrl)) {
+ componentPackageFile = getPackageFile(functionPkgUrl);
functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
functionConfig, componentPackageFile);
} else {
@@ -339,60 +319,17 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
// validate parameters
try {
- if (isNotBlank(functionPkgUrl)) {
- if (Utils.hasPackageTypePrefix(functionPkgUrl)) {
- componentPackageFile = downloadPackageFile(functionPkgUrl);
- } else {
- try {
- componentPackageFile = FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
- } catch (Exception e) {
- throw new IllegalArgumentException(String.format("Encountered error \"%s\" "
- + "when getting %s package from %s", e.getMessage(),
- ComponentTypeUtils.toString(componentType), functionPkgUrl));
- }
- }
- functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
- mergedConfig, componentPackageFile);
-
- } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.FILE)
- || existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)) {
- try {
- componentPackageFile = FunctionCommon.extractFileFromPkgURL(
- existingComponent.getPackageLocation().getPackagePath());
- } catch (Exception e) {
- throw new IllegalArgumentException(String.format("Encountered error \"%s\" "
- + "when getting %s package from %s", e.getMessage(),
- ComponentTypeUtils.toString(componentType), functionPkgUrl));
- }
- functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
- mergedConfig, componentPackageFile);
- } else if (uploadedInputStream != null) {
-
- componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
- functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
- mergedConfig, componentPackageFile);
-
- } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)) {
- functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
- mergedConfig, componentPackageFile);
- if (!isFunctionCodeBuiltin(functionDetails)
- && (componentPackageFile == null || fileDetail == null)) {
- throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType)
- + " Package is not provided");
- }
- } else {
- componentPackageFile = FunctionCommon.createPkgTempFile();
- componentPackageFile.deleteOnExit();
- if (worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) {
- worker().getBrokerAdmin().packages().download(
- existingComponent.getPackageLocation().getPackagePath(),
- componentPackageFile.getAbsolutePath());
- } else {
- WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(),
- componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
- }
- functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
- mergedConfig, componentPackageFile);
+ componentPackageFile = getPackageFile(
+ functionPkgUrl,
+ existingComponent.getPackageLocation().getPackagePath(),
+ uploadedInputStream);
+ functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
+ mergedConfig, componentPackageFile);
+ if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)
+ && !isFunctionCodeBuiltin(functionDetails)
+ && (componentPackageFile == null || fileDetail == null)) {
+ throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType)
+ + " Package is not provided");
}
} catch (Exception e) {
log.error("Invalid update {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType),
@@ -821,7 +758,7 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
if (archive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
archive = archive.replaceFirst("^builtin://", "");
- FunctionsManager functionsManager = this.worker().getFunctionsManager();
+ FunctionsManager functionsManager = worker().getFunctionsManager();
FunctionArchive function = functionsManager.getFunction(archive);
// check if builtin function exists
@@ -860,23 +797,4 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
}
}
}
-
- private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
- return downloadPackageFile(worker(), packageName);
- }
-
- static File downloadPackageFile(PulsarWorkerService worker, String packageName)
- throws IOException, PulsarAdminException {
- Path tempDirectory;
- if (worker.getWorkerConfig().getDownloadDirectory() != null) {
- tempDirectory = Paths.get(worker.getWorkerConfig().getDownloadDirectory());
- } else {
- // use the Nar extraction directory as a temporary directory for downloaded files
- tempDirectory = Paths.get(worker.getWorkerConfig().getNarExtractionDirectory());
- }
- Files.createDirectories(tempDirectory);
- File file = Files.createTempFile(tempDirectory, "function", ".tmp").toFile();
- worker.getBrokerAdmin().packages().download(packageName, file.toString());
- return file;
- }
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index 7b2c5a68ef2..dede6307e76 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -54,7 +54,6 @@ import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.ComponentTypeUtils;
-import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
@@ -144,29 +143,14 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
String.format("%s %s already exists", ComponentTypeUtils.toString(componentType), sinkName));
}
- Function.FunctionDetails functionDetails = null;
- boolean isPkgUrlProvided = isNotBlank(sinkPkgUrl);
+ Function.FunctionDetails functionDetails;
File componentPackageFile = null;
try {
// validate parameters
try {
- if (isPkgUrlProvided) {
- if (Utils.hasPackageTypePrefix(sinkPkgUrl)) {
- componentPackageFile = downloadPackageFile(sinkPkgUrl);
- } else {
- if (!Utils.isFunctionPackageUrlSupported(sinkPkgUrl)) {
- throw new IllegalArgumentException(
- "Function Package url is not valid. supported url (http/https/file)");
- }
- try {
- componentPackageFile = FunctionCommon.extractFileFromPkgURL(sinkPkgUrl);
- } catch (Exception e) {
- throw new IllegalArgumentException(
- String.format("Encountered error \"%s\" when getting %s package from %s",
- e.getMessage(), ComponentTypeUtils.toString(componentType), sinkPkgUrl));
- }
- }
+ if (isNotBlank(sinkPkgUrl)) {
+ componentPackageFile = getPackageFile(sinkPkgUrl);
functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
sinkConfig, componentPackageFile);
} else {
@@ -243,6 +227,12 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
}
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+
+ String transformFunction = sinkConfig.getTransformFunction();
+ if (isNotBlank(transformFunction)) {
+ setTransformFunctionPackageLocation(functionMetaDataBuilder, functionDetails, transformFunction);
+ }
+
updateRequest(null, functionMetaDataBuilder.build());
} finally {
if (componentPackageFile != null && componentPackageFile.exists()) {
@@ -329,67 +319,24 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
}
- Function.FunctionDetails functionDetails = null;
+ Function.FunctionDetails functionDetails;
File componentPackageFile = null;
try {
// validate parameters
try {
- if (isNotBlank(sinkPkgUrl)) {
- if (Utils.hasPackageTypePrefix(sinkPkgUrl)) {
- componentPackageFile = downloadPackageFile(sinkPkgUrl);
- } else {
- try {
- componentPackageFile = FunctionCommon.extractFileFromPkgURL(sinkPkgUrl);
- } catch (Exception e) {
- throw new IllegalArgumentException(
- String.format("Encountered error \"%s\" when getting %s package from %s",
- e.getMessage(), ComponentTypeUtils.toString(componentType), sinkPkgUrl));
- }
- }
- functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
- mergedConfig, componentPackageFile);
-
- } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.FILE)
- || existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)) {
- try {
- componentPackageFile = FunctionCommon
- .extractFileFromPkgURL(existingComponent.getPackageLocation().getPackagePath());
- } catch (Exception e) {
- throw new IllegalArgumentException(
- String.format("Encountered error \"%s\" when getting %s package from %s",
- e.getMessage(), ComponentTypeUtils.toString(componentType), sinkPkgUrl));
- }
- functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
- mergedConfig, componentPackageFile);
- } else if (uploadedInputStream != null) {
-
- componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
- functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
- mergedConfig, componentPackageFile);
-
- } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)) {
- functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
- mergedConfig, componentPackageFile);
- if (!isFunctionCodeBuiltin(functionDetails)
- && (componentPackageFile == null || fileDetail == null)) {
+ componentPackageFile = getPackageFile(
+ sinkPkgUrl,
+ existingComponent.getPackageLocation().getPackagePath(),
+ uploadedInputStream);
+ functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
+ mergedConfig, componentPackageFile);
+ if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)
+ && !isFunctionCodeBuiltin(functionDetails)
+ && (componentPackageFile == null || fileDetail == null)) {
throw new IllegalArgumentException(
ComponentTypeUtils.toString(componentType) + " Package is not provided");
}
- } else {
- componentPackageFile = FunctionCommon.createPkgTempFile();
- componentPackageFile.deleteOnExit();
- if (worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) {
- worker().getBrokerAdmin().packages().download(
- existingComponent.getPackageLocation().getPackagePath(),
- componentPackageFile.getAbsolutePath());
- } else {
- WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile,
- existingComponent.getPackageLocation().getPackagePath());
- }
- functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
- mergedConfig, componentPackageFile);
- }
} catch (Exception e) {
log.error("Invalid update {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant,
namespace, sinkName, e);
@@ -466,6 +413,12 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+ String transformFunction = mergedConfig.getTransformFunction();
+ if (isNotBlank(transformFunction)
+ && !transformFunction.equals(existingSinkConfig.getTransformFunction())) {
+ setTransformFunctionPackageLocation(functionMetaDataBuilder, functionDetails, transformFunction);
+ }
+
updateRequest(existingComponent, functionMetaDataBuilder.build());
} finally {
if (componentPackageFile != null && componentPackageFile.exists()) {
@@ -476,6 +429,34 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
}
}
+ private void setTransformFunctionPackageLocation(Function.FunctionMetaData.Builder functionMetaDataBuilder,
+ Function.FunctionDetails functionDetails, String transformFunction) {
+ File functionPackageFile = null;
+ try {
+ String builtin = null;
+ if (!transformFunction.startsWith(Utils.BUILTIN)) {
+ functionPackageFile = getPackageFile(transformFunction);
+ }
+ Function.PackageLocationMetaData.Builder functionPackageLocation =
+ getFunctionPackageLocation(functionMetaDataBuilder.build(),
+ transformFunction, null, functionPackageFile,
+ functionDetails.getName() + "__sink-function",
+ Function.FunctionDetails.ComponentType.FUNCTION, builtin);
+ functionMetaDataBuilder.setTransformFunctionPackageLocation(functionPackageLocation);
+ } catch (Exception e) {
+ log.error("Failed process {} {}/{}/{} extra function package: ",
+ ComponentTypeUtils.toString(componentType), functionDetails.getTenant(),
+ functionDetails.getNamespace(), functionDetails.getName(), e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ } finally {
+ if (functionPackageFile != null && functionPackageFile.exists()) {
+ if (!transformFunction.startsWith(Utils.FILE)) {
+ functionPackageFile.delete();
+ }
+ }
+ }
+ }
+
private class GetSinkStatus extends GetStatus<SinkStatus, SinkStatus.SinkInstanceStatus.SinkInstanceStatusData> {
@Override
@@ -758,7 +739,8 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
final String namespace,
final String sinkName,
final SinkConfig sinkConfig,
- final File sinkPackageFile) throws IOException {
+ final File sinkPackageFile)
+ throws IOException, PulsarAdminException {
// The rest end points take precedence over whatever is there in sinkConfig
sinkConfig.setTenant(tenant);
@@ -796,8 +778,23 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
throw new IllegalArgumentException("Sink package is not provided");
}
+ ClassLoader functionClassLoader = null;
+ if (isNotBlank(sinkConfig.getTransformFunction())) {
+ functionClassLoader =
+ getBuiltinFunctionClassLoader(sinkConfig.getTransformFunction());
+ if (functionClassLoader == null) {
+ File functionPackageFile = getPackageFile(sinkConfig.getTransformFunction());
+ functionClassLoader = getClassLoaderFromPackage(sinkConfig.getTransformFunctionClassName(),
+ functionPackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
+ }
+ if (functionClassLoader == null) {
+ throw new IllegalArgumentException("Transform Function package not found");
+ }
+ }
+
SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validateAndExtractDetails(
- sinkConfig, classLoader, worker().getWorkerConfig().getValidateConnectorConfig());
+ sinkConfig, classLoader, functionClassLoader, worker().getWorkerConfig().getValidateConnectorConfig());
+
return SinkConfigUtils.convert(sinkConfig, sinkDetails);
} finally {
if (shouldCloseClassLoader) {
@@ -805,8 +802,4 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
}
}
}
-
- private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
- return FunctionsImpl.downloadPackageFile(worker(), packageName);
- }
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index 82d2043e9a9..44199dd9917 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -25,7 +25,6 @@ import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBui
import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
import com.google.protobuf.ByteString;
import java.io.File;
-import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
@@ -54,7 +53,6 @@ import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.ComponentTypeUtils;
-import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
@@ -152,21 +150,7 @@ public class SourcesImpl extends ComponentImpl implements Sources<PulsarWorkerSe
// validate parameters
try {
if (isPkgUrlProvided) {
- if (Utils.hasPackageTypePrefix(sourcePkgUrl)) {
- componentPackageFile = downloadPackageFile(sourcePkgUrl);
- } else {
- if (!Utils.isFunctionPackageUrlSupported(sourcePkgUrl)) {
- throw new IllegalArgumentException(
- "Function Package url is not valid. supported url (http/https/file)");
- }
- try {
- componentPackageFile = FunctionCommon.extractFileFromPkgURL(sourcePkgUrl);
- } catch (Exception e) {
- throw new IllegalArgumentException(
- String.format("Encountered error \"%s\" when getting %s package from %s",
- e.getMessage(), ComponentTypeUtils.toString(componentType), sourcePkgUrl));
- }
- }
+ componentPackageFile = getPackageFile(sourcePkgUrl);
functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
sourceConfig, componentPackageFile);
} else {
@@ -329,66 +313,23 @@ public class SourcesImpl extends ComponentImpl implements Sources<PulsarWorkerSe
throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
}
- Function.FunctionDetails functionDetails = null;
+ Function.FunctionDetails functionDetails;
File componentPackageFile = null;
try {
// validate parameters
try {
- if (isNotBlank(sourcePkgUrl)) {
- if (Utils.hasPackageTypePrefix(sourcePkgUrl)) {
- componentPackageFile = downloadPackageFile(sourcePkgUrl);
- } else {
- try {
- componentPackageFile = FunctionCommon.extractFileFromPkgURL(sourcePkgUrl);
- } catch (Exception e) {
- throw new IllegalArgumentException(
- String.format("Encountered error \"%s\" when getting %s package from %s",
- e.getMessage(), ComponentTypeUtils.toString(componentType), sourcePkgUrl));
- }
- }
- functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
- mergedConfig, componentPackageFile);
-
- } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.FILE)
- || existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)) {
- try {
- componentPackageFile = FunctionCommon
- .extractFileFromPkgURL(existingComponent.getPackageLocation().getPackagePath());
- } catch (Exception e) {
- throw new IllegalArgumentException(
- String.format("Encountered error \"%s\" when getting %s package from %s",
- e.getMessage(), ComponentTypeUtils.toString(componentType), sourcePkgUrl));
- }
- functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
- mergedConfig, componentPackageFile);
- } else if (uploadedInputStream != null) {
-
- componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
- functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
- mergedConfig, componentPackageFile);
-
- } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)) {
- functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
- mergedConfig, componentPackageFile);
- if (!isFunctionCodeBuiltin(functionDetails)
- && (componentPackageFile == null || fileDetail == null)) {
- throw new IllegalArgumentException(
- ComponentTypeUtils.toString(componentType) + " Package is not provided");
- }
- } else {
- componentPackageFile = FunctionCommon.createPkgTempFile();
- componentPackageFile.deleteOnExit();
- if (worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) {
- worker().getBrokerAdmin().packages().download(
- existingComponent.getPackageLocation().getPackagePath(),
- componentPackageFile.getAbsolutePath());
- } else {
- WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile,
- existingComponent.getPackageLocation().getPackagePath());
- }
- functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
- mergedConfig, componentPackageFile);
+ componentPackageFile = getPackageFile(
+ sourcePkgUrl,
+ existingComponent.getPackageLocation().getPackagePath(),
+ uploadedInputStream);
+ functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
+ mergedConfig, componentPackageFile);
+ if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)
+ && !isFunctionCodeBuiltin(functionDetails)
+ && (componentPackageFile == null || fileDetail == null)) {
+ throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType)
+ + " Package is not provided");
}
} catch (Exception e) {
log.error("Invalid update {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant,
@@ -805,8 +746,4 @@ public class SourcesImpl extends ComponentImpl implements Sources<PulsarWorkerSe
}
}
}
-
- private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
- return FunctionsImpl.downloadPackageFile(worker(), packageName);
- }
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
index 3efad1da1cc..092844db489 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
@@ -339,9 +339,12 @@ public class FunctionsApiV3Resource extends FunctionApiResource {
@ApiParam(value = "The namespace of functions")
final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of functions")
- final @PathParam("functionName") String functionName) {
+ final @PathParam("functionName") String functionName,
+ @ApiParam(value = "Whether to download the transform function")
+ final @QueryParam("transform-function") boolean transformFunction) {
- return functions().downloadFunction(tenant, namespace, functionName, clientAppId(), clientAuthData());
+ return functions()
+ .downloadFunction(tenant, namespace, functionName, clientAppId(), clientAuthData(), transformFunction);
}
@GET
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java
index c305d64b9f3..db315f1181e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java
@@ -172,7 +172,17 @@ public interface Component<W extends WorkerService> {
String namespace,
String componentName,
String clientRole,
- AuthenticationDataSource clientAuthenticationDataHttps);
+ AuthenticationDataSource clientAuthenticationDataHttps,
+ boolean transformFunction);
+
+ @Deprecated
+ default StreamingOutput downloadFunction(String tenant,
+ String namespace,
+ String componentName,
+ String clientRole,
+ AuthenticationDataSource clientAuthenticationDataHttps) {
+ return downloadFunction(tenant, namespace, componentName, clientRole, clientAuthenticationDataHttps, false);
+ }
@Deprecated
default StreamingOutput downloadFunction(String tenant,
@@ -185,7 +195,8 @@ public interface Component<W extends WorkerService> {
namespace,
componentName,
clientRole,
- (AuthenticationDataSource) clientAuthenticationDataHttps);
+ (AuthenticationDataSource) clientAuthenticationDataHttps,
+ false);
}
List<ConnectorDefinition> getListOfConnectors();
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index 9ccdc09bd7d..2412aebc294 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -78,7 +78,6 @@ public class FunctionActionerTest {
@SuppressWarnings("resource")
FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace,
new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class));
- Runtime runtime = mock(Runtime.class);
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant")
.setNamespace("test-namespace").setName("func-1"))
@@ -115,7 +114,7 @@ public class FunctionActionerTest {
RuntimeFactory factory = mock(RuntimeFactory.class);
Runtime runtime = mock(Runtime.class);
- doReturn(runtime).when(factory).createContainer(any(), any(), any(), any());
+ doReturn(runtime).when(factory).createContainer(any(), any(), any(), any(), any(), any());
doNothing().when(runtime).start();
Namespace dlogNamespace = mock(Namespace.class);
final String exceptionMsg = "dl namespace not-found";
@@ -128,39 +127,49 @@ public class FunctionActionerTest {
// (1) test with file url. functionActioner should be able to consider file-url and it should be able to call
// RuntimeSpawner
String pkgPathLocation = FILE + ":/user/my-file.jar";
- Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
- .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant")
- .setNamespace("test-namespace").setName("func-1"))
- .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath(pkgPathLocation).build())
- .build();
- Function.Instance instance = Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
- .build();
- FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class);
- doReturn(instance).when(functionRuntimeInfo).getFunctionInstance();
-
- actioner.startFunction(functionRuntimeInfo);
+ startFunction(actioner, pkgPathLocation, pkgPathLocation);
verify(runtime, times(1)).start();
// (2) test with http-url, downloading file from http should fail with UnknownHostException due to invalid url
- pkgPathLocation = "http://invalid/my-file.jar";
- function1 = Function.FunctionMetaData.newBuilder()
- .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant")
- .setNamespace("test-namespace").setName("func-1"))
- .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath(pkgPathLocation).build())
- .build();
- instance = Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0).build();
- functionRuntimeInfo = mock(FunctionRuntimeInfo.class);
- doReturn(instance).when(functionRuntimeInfo).getFunctionInstance();
- doThrow(new IllegalStateException("StartupException")).when(functionRuntimeInfo).setStartupException(any());
+ String invalidPkgPathLocation = "http://invalid/my-file.jar";
try {
- actioner.startFunction(functionRuntimeInfo);
+ startFunction(actioner, invalidPkgPathLocation, pkgPathLocation);
+ fail();
+ } catch (IllegalStateException ex) {
+ assertEquals(ex.getMessage(), "StartupException");
+ }
+
+ try {
+ startFunction(actioner, pkgPathLocation, invalidPkgPathLocation);
fail();
} catch (IllegalStateException ex) {
assertEquals(ex.getMessage(), "StartupException");
}
}
+ private void startFunction(FunctionActioner actioner, String pkgPathLocation, String extraPkgPathLocation) {
+ PackageLocationMetaData packageLocation = PackageLocationMetaData.newBuilder()
+ .setPackagePath(pkgPathLocation)
+ .build();
+ PackageLocationMetaData extraPackageLocation = PackageLocationMetaData.newBuilder()
+ .setPackagePath(extraPkgPathLocation)
+ .build();
+ Function.FunctionMetaData function = Function.FunctionMetaData.newBuilder()
+ .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant")
+ .setNamespace("test-namespace").setName("func-1"))
+ .setPackageLocation(packageLocation)
+ .setTransformFunctionPackageLocation(extraPackageLocation)
+ .build();
+ Function.Instance instance = Function.Instance.newBuilder().setFunctionMetaData(function).setInstanceId(0)
+ .build();
+ FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class);
+ doReturn(instance).when(functionRuntimeInfo).getFunctionInstance();
+ doThrow(new IllegalStateException("StartupException")).when(functionRuntimeInfo).setStartupException(any());
+
+ actioner.startFunction(functionRuntimeInfo);
+ }
+
@Test
public void testFunctionAuthDisabled() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
@@ -177,7 +186,7 @@ public class FunctionActionerTest {
RuntimeFactory factory = mock(RuntimeFactory.class);
Runtime runtime = mock(Runtime.class);
- doReturn(runtime).when(factory).createContainer(any(), any(), any(), any());
+ doReturn(runtime).when(factory).createContainer(any(), any(), any(), any(), any(), any());
doNothing().when(runtime).start();
Namespace dlogNamespace = mock(Namespace.class);
final String exceptionMsg = "dl namespace not-found";
@@ -198,7 +207,7 @@ public class FunctionActionerTest {
Function.Instance instance = Function.Instance.newBuilder()
.setFunctionMetaData(functionMeta).build();
- RuntimeSpawner runtimeSpawner = spy(actioner.getRuntimeSpawner(instance, "foo"));
+ RuntimeSpawner runtimeSpawner = spy(actioner.getRuntimeSpawner(instance, "foo", "bar"));
assertNull(runtimeSpawner.getInstanceConfig().getFunctionAuthenticationSpec());
@@ -236,7 +245,7 @@ public class FunctionActionerTest {
RuntimeFactory factory = mock(RuntimeFactory.class);
Runtime runtime = mock(Runtime.class);
- doReturn(runtime).when(factory).createContainer(any(), any(), any(), any());
+ doReturn(runtime).when(factory).createContainer(any(), any(), any(), any(), any(), any());
doNothing().when(runtime).start();
Namespace dlogNamespace = mock(Namespace.class);
final String exceptionMsg = "dl namespace not-found";
@@ -253,17 +262,7 @@ public class FunctionActionerTest {
// (1) test with file url. functionActioner should be able to consider file-url and it should be able to call
// RuntimeSpawner
String pkgPathLocation = "function://public/default/test-function@latest";
- Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
- .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant")
- .setNamespace("test-namespace").setName("func-1"))
- .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath(pkgPathLocation).build())
- .build();
- Function.Instance instance = Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
- .build();
- FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class);
- doReturn(instance).when(functionRuntimeInfo).getFunctionInstance();
-
- actioner.startFunction(functionRuntimeInfo);
+ startFunction(actioner, pkgPathLocation, pkgPathLocation);
verify(runtime, times(1)).start();
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index ceda6f89eb8..61237cdf481 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -716,7 +716,7 @@ public class FunctionRuntimeManagerTest {
doReturn(true).when(kubernetesRuntimeFactory).externallyManaged();
KubernetesRuntime kubernetesRuntime = mock(KubernetesRuntime.class);
- doReturn(kubernetesRuntime).when(kubernetesRuntimeFactory).createContainer(any(), any(), any(), any());
+ doReturn(kubernetesRuntime).when(kubernetesRuntimeFactory).createContainer(any(), any(), any(), any(), any(), any());
FunctionActioner functionActioner = spy(new FunctionActioner(
workerConfig,
@@ -742,7 +742,9 @@ public class FunctionRuntimeManagerTest {
functionRuntimeManager.setFunctionActioner(functionActioner);
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
- .setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath("path").build())
+ .setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath("path"))
+ .setTransformFunctionPackageLocation(Function.PackageLocationMetaData.newBuilder()
+ .setPackagePath("function-path"))
.setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
@@ -771,7 +773,8 @@ public class FunctionRuntimeManagerTest {
FunctionRuntimeInfo functionRuntimeInfo = new FunctionRuntimeInfo()
.setFunctionInstance(instance)
.setRuntimeSpawner(functionActioner
- .getRuntimeSpawner(instance, function1.getPackageLocation().getPackagePath()));
+ .getRuntimeSpawner(instance, function1.getPackageLocation().getPackagePath(),
+ function1.getTransformFunctionPackageLocation().getPackagePath()));
functionRuntimeManager.functionRuntimeInfos.put(
"test-tenant/test-namespace/func-1:0", functionRuntimeInfo);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index 4b7e9940230..7ffebcb3216 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -152,7 +152,7 @@ public class FunctionsImplTest {
instanceConfig.setMaxBufferedTuples(1024);
JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
- instanceConfig, null, null, null, null, null, null, null, null);
+ instanceConfig, null, null, null, null, null, null, null, null, null);
CompletableFuture<InstanceCommunication.MetricsData> metricsDataCompletableFuture =
new CompletableFuture<InstanceCommunication.MetricsData>();
metricsDataCompletableFuture.complete(javaInstanceRunnable.getMetrics());
@@ -207,7 +207,7 @@ public class FunctionsImplTest {
instanceConfig.setMaxBufferedTuples(1024);
JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
- instanceConfig, null, null, null, null, null, null, null, null);
+ instanceConfig, null, null, null, null, null, null, null, null, null);
CompletableFuture<InstanceCommunication.MetricsData> completableFuture =
new CompletableFuture<InstanceCommunication.MetricsData>();
completableFuture.complete(javaInstanceRunnable.getMetrics());
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index a868d632506..8d19869b473 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -1688,6 +1688,66 @@ public class FunctionApiV3ResourceTest {
}
}
+ @Test
+ public void testDownloadFunctionByName() throws Exception {
+ URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
+ File file = Paths.get(fileUrl.toURI()).toFile();
+ String fileLocation = file.getAbsolutePath().replace('\\', '/');
+ String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+ doReturn(true).when(mockedWorkerService).isInitialized();
+ WorkerConfig config = mock(WorkerConfig.class);
+ when(config.isAuthorizationEnabled()).thenReturn(false);
+ when(mockedWorkerService.getWorkerConfig()).thenReturn(config);
+
+ when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+
+ FunctionMetaData metaData = FunctionMetaData.newBuilder()
+ .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("file:///" + fileLocation))
+ .setTransformFunctionPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("http://invalid"))
+ .build();
+ when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData);
+
+ StreamingOutput streamOutput = resource.downloadFunction(tenant, namespace, function, null, null, false);
+ File pkgFile = new File(testDir, UUID.randomUUID().toString());
+ OutputStream output = new FileOutputStream(pkgFile);
+ streamOutput.write(output);
+ Assert.assertTrue(pkgFile.exists());
+ if (pkgFile.exists()) {
+ pkgFile.delete();
+ }
+ }
+
+ @Test
+ public void testDownloadTransformFunctionByName() throws Exception {
+ URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
+ File file = Paths.get(fileUrl.toURI()).toFile();
+ String fileLocation = file.getAbsolutePath().replace('\\', '/');
+ String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+ doReturn(true).when(mockedWorkerService).isInitialized();
+ WorkerConfig config = mock(WorkerConfig.class);
+ when(config.isAuthorizationEnabled()).thenReturn(false);
+ when(mockedWorkerService.getWorkerConfig()).thenReturn(config);
+
+ when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+
+ FunctionMetaData metaData = FunctionMetaData.newBuilder()
+ .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("http://invalid"))
+ .setTransformFunctionPackageLocation(PackageLocationMetaData.newBuilder()
+ .setPackagePath("file:///" + fileLocation))
+ .build();
+ when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData);
+
+ StreamingOutput streamOutput = resource.downloadFunction(tenant, namespace, function, null, null, true);
+ File pkgFile = new File(testDir, UUID.randomUUID().toString());
+ OutputStream output = new FileOutputStream(pkgFile);
+ streamOutput.write(output);
+ Assert.assertTrue(pkgFile.exists());
+ if (pkgFile.exists()) {
+ pkgFile.delete();
+ }
+ }
+
+
@Test
public void testRegisterFunctionFileUrlWithValidSinkClass() throws Exception {
Configurator.setRootLevel(Level.DEBUG);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 791273cf537..7fd0fb1030f 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -20,9 +20,11 @@ package org.apache.pulsar.functions.worker.rest.api.v3;
import static org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.ATLEAST_ONCE;
import static org.apache.pulsar.functions.source.TopicSchema.DEFAULT_SERDE;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
@@ -63,6 +65,8 @@ import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.RestException;
+import org.apache.pulsar.functions.api.examples.ExclamationFunction;
+import org.apache.pulsar.functions.api.examples.RecordFunction;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
@@ -71,11 +75,14 @@ import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
+import org.apache.pulsar.functions.utils.functions.FunctionArchive;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
+import org.apache.pulsar.functions.worker.FunctionsManager;
import org.apache.pulsar.functions.worker.LeaderService;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -635,13 +642,7 @@ public class SinkApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false);
- SinkConfig sinkConfig = new SinkConfig();
- sinkConfig.setTenant(tenant);
- sinkConfig.setNamespace(namespace);
- sinkConfig.setName(sink);
- sinkConfig.setClassName(CASSANDRA_STRING_SINK);
- sinkConfig.setParallelism(parallelism);
- sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
+ SinkConfig sinkConfig = createDefaultSinkConfig();
try (FileInputStream inputStream = new FileInputStream(getPulsarIOCassandraNar())) {
resource.registerSink(
actualTenant,
@@ -710,6 +711,107 @@ public class SinkApiV3ResourceTest {
}
}
+ @Test
+ public void testRegisterSinkSuccessWithTransformFunction() throws Exception {
+ mockInstanceUtils();
+ mockWorkerUtils();
+
+ when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
+
+ NarClassLoader mockedClassLoader = mock(NarClassLoader.class);
+ doReturn(RecordFunction.class).when(mockedClassLoader).loadClass("RecordFunction");
+ mockStatic(FunctionCommon.class, ctx -> {
+ ctx.when(FunctionCommon::createPkgTempFile).thenCallRealMethod();
+ ctx.when(() -> FunctionCommon.getRawFunctionTypes(any(), anyBoolean())).thenCallRealMethod();
+ ctx.when(() -> FunctionCommon.getFunctionTypes(any(), anyBoolean())).thenCallRealMethod();
+ ctx.when(() -> FunctionCommon.getFunctionClassParent(any(), anyBoolean())).thenCallRealMethod();
+ ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), any(), any(), any())).thenCallRealMethod();
+ ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), any())).thenReturn(mockedClassLoader);
+ });
+
+ mockStatic(FunctionUtils.class, ctx -> {
+ ctx.when(() -> FunctionUtils.getFunctionClass(any())).thenReturn("RecordFunction");
+ });
+
+ FunctionsManager mockedFunctionsManager = mock(FunctionsManager.class);
+ FunctionArchive functionArchive = FunctionArchive.builder()
+ .classLoader(mockedClassLoader)
+ .build();
+ when(mockedFunctionsManager.getFunction("transform")).thenReturn(functionArchive);
+
+ when(mockedWorkerService.getFunctionsManager()).thenReturn(mockedFunctionsManager);
+
+ SinkConfig sinkConfig = createDefaultSinkConfig();
+ sinkConfig.setTransformFunction("builtin://transform");
+ sinkConfig.setTransformFunctionConfig("{\"dummy\": \"dummy\"}");
+
+ try (FileInputStream inputStream = new FileInputStream(getPulsarIOCassandraNar())) {
+ resource.registerSink(
+ tenant,
+ namespace,
+ sink,
+ inputStream,
+ mockedFormData,
+ null,
+ sinkConfig,
+ null, null);
+ }
+ }
+
+ @Test(expectedExceptions = RestException.class,
+ expectedExceptionsMessageRegExp = "Sink transform function output must be of type Record")
+ public void testRegisterSinkFailureWithInvalidTransformFunction() throws Exception {
+ mockInstanceUtils();
+ mockWorkerUtils();
+
+ when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
+
+ NarClassLoader mockedClassLoader = mock(NarClassLoader.class);
+ doReturn(ExclamationFunction.class).when(mockedClassLoader).loadClass("ExclamationFunction");
+ mockStatic(FunctionCommon.class, ctx -> {
+ ctx.when(FunctionCommon::createPkgTempFile).thenCallRealMethod();
+ ctx.when(() -> FunctionCommon.getRawFunctionTypes(any(), anyBoolean())).thenCallRealMethod();
+ ctx.when(() -> FunctionCommon.getFunctionTypes(any(), anyBoolean())).thenCallRealMethod();
+ ctx.when(() -> FunctionCommon.getFunctionClassParent(any(), anyBoolean())).thenCallRealMethod();
+ ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), any(), any(), any())).thenCallRealMethod();
+ ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), any())).thenReturn(mockedClassLoader);
+ });
+
+ mockStatic(FunctionUtils.class, ctx -> {
+ ctx.when(() -> FunctionUtils.getFunctionClass(any())).thenReturn("ExclamationFunction");
+ });
+
+ FunctionsManager mockedFunctionsManager = mock(FunctionsManager.class);
+ FunctionArchive functionArchive = FunctionArchive.builder()
+ .classLoader(mockedClassLoader)
+ .build();
+ when(mockedFunctionsManager.getFunction("transform")).thenReturn(functionArchive);
+
+ when(mockedWorkerService.getFunctionsManager()).thenReturn(mockedFunctionsManager);
+
+ SinkConfig sinkConfig = createDefaultSinkConfig();
+ sinkConfig.setTransformFunction("builtin://transform");
+ sinkConfig.setTransformFunctionConfig("{\"dummy\": \"dummy\"}");
+
+ try {
+ try (FileInputStream inputStream = new FileInputStream(getPulsarIOCassandraNar())) {
+ resource.registerSink(
+ tenant,
+ namespace,
+ sink,
+ inputStream,
+ mockedFormData,
+ null,
+ sinkConfig,
+ null, null);
+ }
+ } catch (RestException e) {
+ // expected exception
+ assertEquals(e.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+ throw e;
+ }
+ }
+
//
// Update Functions
//
@@ -882,7 +984,7 @@ public class SinkApiV3ResourceTest {
this.mockedFunctionMetaData =
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
- when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
+ when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))).thenReturn(mockedFunctionMetaData);
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
@@ -928,13 +1030,7 @@ public class SinkApiV3ResourceTest {
}
private void updateDefaultSinkWithPackageUrl(String packageUrl) throws Exception {
- SinkConfig sinkConfig = new SinkConfig();
- sinkConfig.setTenant(tenant);
- sinkConfig.setNamespace(namespace);
- sinkConfig.setName(sink);
- sinkConfig.setClassName(CASSANDRA_STRING_SINK);
- sinkConfig.setParallelism(parallelism);
- sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
+ SinkConfig sinkConfig = createDefaultSinkConfig();
mockStatic(ConnectorUtils.class, ctx -> {
ctx.when(() -> ConnectorUtils.getIOSinkClass(any(NarClassLoader.class)))
@@ -1019,13 +1115,7 @@ public class SinkApiV3ResourceTest {
String filePackageUrl = getPulsarIOCassandraNar().toURI().toString();
- SinkConfig sinkConfig = new SinkConfig();
- sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
- sinkConfig.setTenant(tenant);
- sinkConfig.setNamespace(namespace);
- sinkConfig.setName(sink);
- sinkConfig.setClassName(CASSANDRA_STRING_SINK);
- sinkConfig.setParallelism(parallelism);
+ SinkConfig sinkConfig = createDefaultSinkConfig();
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
@@ -1115,6 +1205,54 @@ public class SinkApiV3ResourceTest {
}
}
+ @Test
+ public void testUpdateSinkDifferentTransformFunction() throws Exception {
+ mockWorkerUtils();
+
+ SinkConfig sinkConfig = createDefaultSinkConfig();
+ sinkConfig.setTransformFunction("builtin://transform");
+ sinkConfig.setTransformFunctionClassName("DummyFunction");
+ sinkConfig.setTransformFunctionConfig("{\"dummy\": \"dummy\"}");
+
+ NarClassLoader mockedClassLoader = mock(NarClassLoader.class);
+ doReturn(RecordFunction.class).when(mockedClassLoader).loadClass("DummyFunction");
+ mockStatic(FunctionCommon.class, ctx -> {
+ ctx.when(FunctionCommon::createPkgTempFile).thenCallRealMethod();
+ ctx.when(() -> FunctionCommon.getRawFunctionTypes(any(), anyBoolean())).thenCallRealMethod();
+ ctx.when(() -> FunctionCommon.getFunctionTypes(any(), anyBoolean())).thenCallRealMethod();
+ ctx.when(() -> FunctionCommon.getFunctionClassParent(any(), anyBoolean())).thenCallRealMethod();
+ ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), any(), any(), any())).thenCallRealMethod();
+ ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), any())).thenReturn(mockedClassLoader);
+ });
+
+ this.mockedFunctionMetaData =
+ FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
+ when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))).thenReturn(mockedFunctionMetaData);
+
+ when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+
+ FunctionsManager mockedFunctionsManager = mock(FunctionsManager.class);
+ FunctionArchive functionArchive = FunctionArchive.builder()
+ .classLoader(mockedClassLoader)
+ .build();
+ when(mockedFunctionsManager.getFunction("transform")).thenReturn(functionArchive);
+
+ when(mockedWorkerService.getFunctionsManager()).thenReturn(mockedFunctionsManager);
+
+
+ try (FileInputStream inputStream = new FileInputStream(getPulsarIOCassandraNar())) {
+ resource.updateSink(
+ tenant,
+ namespace,
+ sink,
+ inputStream,
+ mockedFormData,
+ null,
+ sinkConfig,
+ null, null, null);
+ }
+ }
+
//
// deregister sink
//
@@ -1238,7 +1376,7 @@ public class SinkApiV3ResourceTest {
}
@Test
- public void testDeregisterSinkBKPackageCleanup() throws IOException {
+ public void testDeregisterSinkBKPackageCleanup() {
mockInstanceUtils();
try (final MockedStatic<WorkerUtils> ctx = Mockito.mockStatic(WorkerUtils.class)) {
@@ -1246,15 +1384,20 @@ public class SinkApiV3ResourceTest {
String packagePath =
"public/default/test/591541f0-c7c5-40c0-983b-610c722f90b0-pulsar-io-batch-data-generator-2.7.0.nar";
+ String transformFunctionPackagePath =
+ "public/default/test/591541f0-c7c5-40c0-983b-610c722f90b0-test-function.nar";
+ FunctionMetaData functionMetaData = FunctionMetaData.newBuilder()
+ .setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath))
+ .setTransformFunctionPackageLocation(Function.PackageLocationMetaData.newBuilder()
+ .setPackagePath(transformFunctionPackagePath))
+ .build();
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink)))
- .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
- Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+ .thenReturn(functionMetaData);
deregisterDefaultSink();
- ctx.verify(() -> {
- WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
- }, times(1));
+ ctx.verify(() -> WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath)), times(1));
+ ctx.verify(() -> WorkerUtils.deleteFromBookkeeper(any(), eq(transformFunctionPackagePath)), times(1));
}
}
@@ -1266,16 +1409,19 @@ public class SinkApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
String packagePath = String.format("%s://data-generator", Utils.BUILTIN);
+ String transformFunctionPackagePath = String.format("%s://exclamation", Utils.BUILTIN);
+ FunctionMetaData functionMetaData = FunctionMetaData.newBuilder()
+ .setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath))
+ .setTransformFunctionPackageLocation(Function.PackageLocationMetaData.newBuilder()
+ .setPackagePath(transformFunctionPackagePath))
+ .build();
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink)))
- .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
- Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+ .thenReturn(functionMetaData);
deregisterDefaultSink();
// if the sink is a builtin sink we shouldn't try to clean it up
- ctx.verify(() -> {
- WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
- }, times(0));
+ ctx.verify(() -> WorkerUtils.deleteFromBookkeeper(any(), anyString()), times(0));
}
}
@@ -1288,22 +1434,25 @@ public class SinkApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
String packagePath = "http://foo.com/connector.jar";
+ String transformFunctionPackagePath = "http://foo.com/function.jar";
+ FunctionMetaData functionMetaData = FunctionMetaData.newBuilder()
+ .setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath))
+ .setTransformFunctionPackageLocation(Function.PackageLocationMetaData.newBuilder()
+ .setPackagePath(transformFunctionPackagePath))
+ .build();
+
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink)))
- .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
- Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+ .thenReturn(functionMetaData);
deregisterDefaultSink();
// if the sink is a is download from a http url, we shouldn't try to clean it up
- ctx.verify(() -> {
- WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
- }, times(0));
-
+ ctx.verify(() -> WorkerUtils.deleteFromBookkeeper(any(), anyString()), times(0));
}
}
@Test
- public void testDeregisterFileSinkBKPackageCleanup() throws IOException {
+ public void testDeregisterFileSinkBKPackageCleanup() {
mockInstanceUtils();
try (final MockedStatic<WorkerUtils> ctx = Mockito.mockStatic(WorkerUtils.class)) {
@@ -1311,16 +1460,20 @@ public class SinkApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
String packagePath = "file://foo/connector.jar";
+ String transformFunctionPackagePath = "file://foo/function.jar";
+ FunctionMetaData functionMetaData = FunctionMetaData.newBuilder()
+ .setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath))
+ .setTransformFunctionPackageLocation(Function.PackageLocationMetaData.newBuilder()
+ .setPackagePath(transformFunctionPackagePath))
+ .build();
+
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink)))
- .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
- Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+ .thenReturn(functionMetaData);
deregisterDefaultSink();
// if the sink package has a file url, we shouldn't try to clean it up
- ctx.verify(() -> {
- WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
- }, times(0));
+ ctx.verify(() -> WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath)), times(0));
}
}
@@ -1574,7 +1727,7 @@ public class SinkApiV3ResourceTest {
private FunctionDetails createDefaultFunctionDetails() throws IOException {
return SinkConfigUtils.convert(createDefaultSinkConfig(),
- new SinkConfigUtils.ExtractedSinkDetails(null, null));
+ new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
}
/*
diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestLoggingSink.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestLoggingSink.java
new file mode 100644
index 00000000000..571eb6a9d1a
--- /dev/null
+++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestLoggingSink.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import java.util.Map;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.slf4j.Logger;
+
+public class TestLoggingSink implements Sink<GenericObject> {
+
+ private Logger logger;
+ private Producer<String> producer;
+
+ @Override
+ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+ logger = sinkContext.getLogger();
+ String topic = (String) sinkContext.getSinkConfig().getConfigs().get("log-topic");
+ producer = sinkContext.getPulsarClient().newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+ }
+
+ @Override
+ public void write(Record<GenericObject> record) throws Exception {
+ Object nativeObject = record.getValue().getNativeObject();
+ logger.info("Got message: " + nativeObject + " with schema" + record.getSchema());
+ String payload = nativeObject.toString();
+ if (nativeObject instanceof KeyValue) {
+ KeyValue kv = (KeyValue) nativeObject;
+ String key = kv.getKey().toString();
+ String value = kv.getValue().toString();
+
+ if (kv.getKey() instanceof GenericObject) {
+ key = ((GenericObject) kv.getKey()).getNativeObject().toString();
+ }
+ if (kv.getValue() instanceof GenericObject) {
+ value = ((GenericObject) kv.getValue()).getNativeObject().toString();
+ }
+ payload = "(key = " + key + ", value = " + value + ")";
+ }
+ producer.newMessage()
+ .properties(record.getProperties())
+ .value(record.getSchema().getSchemaInfo().getType().name() + " - " + payload)
+ .send();
+ record.ack();
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkWithTransformFunctionTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkWithTransformFunctionTest.java
new file mode 100644
index 00000000000..b5faf59d25a
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkWithTransformFunctionTest.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.functions.api.examples.pojo.Users;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.containers.StandaloneContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
+import org.testcontainers.containers.Network;
+import org.testng.annotations.Test;
+
+/**
+ * Test behaviour of sinks with a transform function
+ */
+@Slf4j
+public class SinkWithTransformFunctionTest extends PulsarStandaloneTestSuite {
+
+ //Use PIP-117 new defaults so that the package management service is enabled.
+ @Override
+ public void setUpCluster() throws Exception {
+ incrementSetupNumber();
+ network = Network.newNetwork();
+ String clusterName = PulsarClusterTestBase.randomName(8);
+ container = new StandaloneContainer(clusterName, PulsarContainer.DEFAULT_IMAGE_NAME)
+ .withNetwork(network)
+ .withNetworkAliases(StandaloneContainer.NAME + "-" + clusterName)
+ .withEnv("PF_stateStorageServiceUrl", "bk://localhost:4181");
+ container.start();
+ log.info("Pulsar cluster {} is up running:", clusterName);
+ log.info("\tBinary Service Url : {}", container.getPlainTextServiceUrl());
+ log.info("\tHttp Service Url : {}", container.getHttpServiceUrl());
+
+ // add cluster to public tenant
+ ContainerExecResult result = container.execCmd(
+ "/pulsar/bin/pulsar-admin", "namespaces", "policies", "public/default");
+ assertEquals(0, result.getExitCode());
+ log.info("public/default namespace policies are {}", result.getStdout());
+ }
+
+ @Test(groups = {"sink"})
+ public void testSinkWithTransformFunction() throws Exception {
+
+ @Cleanup PulsarClient client = PulsarClient.builder()
+ .serviceUrl(container.getPlainTextServiceUrl())
+ .build();
+
+ final int numRecords = 10;
+
+ String sinkName = "sink-with-function";
+ String topicName = "sink-with-function";
+ String logTopicName = "log-sink-with-function";
+ String packageName = "function://public/default/sink-with-function-function@1.0";
+
+ submitPackage(packageName, "package-function", JAVAJAR);
+
+ submitSinkConnector(
+ sinkName,
+ topicName,
+ "org.apache.pulsar.tests.integration.io.TestLoggingSink",
+ JAVAJAR,
+ "{\"log-topic\": \"" + logTopicName + "\"}",
+ packageName,
+ "org.apache.pulsar.functions.api.examples.RecordFunction");
+
+ getSinkInfoSuccess(sinkName);
+ getSinkStatus(sinkName);
+
+ @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(topicName)
+ .create();
+
+ @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(logTopicName)
+ .subscriptionName("sub")
+ .subscribe();
+
+ for (int i = 0; i < numRecords; i++) {
+ producer.send(i + "-test");
+ }
+
+ try {
+ log.info("waiting for sink {}", sinkName);
+
+ for (int i = 0; i < numRecords; i++) {
+ Message<String> receive = consumer.receive(5, TimeUnit.SECONDS);
+ assertNotNull(receive);
+ assertEquals(receive.getValue(), "STRING - " + i + "-test!");
+ }
+ } finally {
+ dumpFunctionLogs(sinkName);
+ }
+
+ deleteSink(sinkName);
+ getSinkInfoNotFound(sinkName);
+ }
+
+ @Test(groups = {"sink"})
+ public void testGenericObjectSinkWithTransformFunction() throws Exception {
+
+ @Cleanup PulsarClient client = PulsarClient.builder()
+ .serviceUrl(container.getPlainTextServiceUrl())
+ .build();
+
+ final int numRecords = 10;
+
+ String sinkName = "sink-with-genericobject-function";
+ String topicName = "sink-with-genericobject-function";
+ String logTopicName = "log-sink-with-genericobject-function";
+ String packageName = "function://public/default/sink-with-genericobject-function-function@1.0";
+
+ submitPackage(packageName, "package-function", JAVAJAR);
+
+ submitSinkConnector(
+ sinkName,
+ topicName,
+ "org.apache.pulsar.tests.integration.io.TestLoggingSink",
+ JAVAJAR,
+ "{\"log-topic\": \"" + logTopicName + "\"}",
+ packageName,
+ "org.apache.pulsar.tests.integration.functions.RemoveAvroFieldRecordFunction");
+
+ getSinkInfoSuccess(sinkName);
+ getSinkStatus(sinkName);
+
+ try {
+ @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(logTopicName)
+ .subscriptionName("sub")
+ .subscribe();
+
+ @Cleanup Producer<Users.UserV1> producer1 = client.newProducer(Schema.AVRO(Users.UserV1.class))
+ .topic(topicName)
+ .create();
+
+ for (int i = 0; i < numRecords; i++) {
+ producer1.send(new Users.UserV1("foo" + i, i));
+ }
+
+ for (int i = 0; i < numRecords; i++) {
+ Message<String> receive = consumer.receive(5, TimeUnit.SECONDS);
+ assertNotNull(receive);
+ assertEquals(receive.getValue(), "AVRO - {\"name\": \"foo" + i + "\"}");
+ }
+
+ // Test with schema evolution
+ @Cleanup Producer<Users.UserV2> producer2 = client.newProducer(Schema.AVRO(Users.UserV2.class))
+ .topic(topicName)
+ .create();
+
+ for (int i = 0; i < numRecords; i++) {
+ producer2.send(new Users.UserV2("foo" + i, i, "bar" + i));
+ }
+
+ for (int i = 0; i < numRecords; i++) {
+ Message<String> receive = consumer.receive(5, TimeUnit.SECONDS);
+ assertNotNull(receive);
+ assertEquals(receive.getValue(), "AVRO - {\"name\": \"foo" + i + "\", \"phone\": \"bar"+ i + "\"}");
+ }
+
+ for (int i = 0; i < numRecords; i++) {
+ producer1.send(new Users.UserV1("foo" + i, i));
+ }
+
+ for (int i = 0; i < numRecords; i++) {
+ Message<String> receive = consumer.receive(5, TimeUnit.SECONDS);
+ assertNotNull(receive);
+ assertEquals(receive.getValue(), "AVRO - {\"name\": \"foo" + i + "\"}");
+ }
+ } finally {
+ dumpFunctionLogs(sinkName);
+ }
+
+ deleteSink(sinkName);
+ getSinkInfoNotFound(sinkName);
+ }
+
+ @Test(groups = {"sink"})
+ public void testKeyValueSinkWithTransformFunction() throws Exception {
+
+ @Cleanup PulsarClient client = PulsarClient.builder()
+ .serviceUrl(container.getPlainTextServiceUrl())
+ .build();
+
+ final int numRecords = 10;
+
+ String sinkName = "sink-with-kv-function";
+ String topicName = "sink-with-kv-function";
+ String logTopicName = "log-sink-with-kv-function";
+ String packageName = "function://public/default/sink-with-kv-function-function@1.0";
+
+ submitPackage(packageName, "package-function", JAVAJAR);
+
+ submitSinkConnector(
+ sinkName,
+ topicName,
+ "org.apache.pulsar.tests.integration.io.TestLoggingSink",
+ JAVAJAR,
+ "{\"log-topic\": \"" + logTopicName + "\"}",
+ packageName,
+ "org.apache.pulsar.tests.integration.functions.RemoveAvroFieldRecordFunction");
+
+ getSinkInfoSuccess(sinkName);
+ getSinkStatus(sinkName);
+
+ try {
+ @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(logTopicName)
+ .subscriptionName("sub")
+ .subscribe();
+
+ @Cleanup Producer<KeyValue<Users.UserV1, Users.UserV1>> producer = client
+ .newProducer(Schema.KeyValue(
+ Schema.AVRO(Users.UserV1.class),
+ Schema.AVRO(Users.UserV1.class), KeyValueEncodingType.SEPARATED))
+ .topic(topicName)
+ .create();
+
+ for (int i = 0; i < numRecords; i++) {
+ producer.send(new KeyValue<>(new Users.UserV1("foo" + i, i),
+ new Users.UserV1("bar" + i, i + 100)));
+ }
+
+ for (int i = 0; i < numRecords; i++) {
+ Message<String> receive = consumer.receive(5, TimeUnit.SECONDS);
+ assertNotNull(receive);
+ assertEquals(receive.getValue(), "KEY_VALUE - (key = {\"age\": " + i
+ + ", \"name\": \"foo" + i + "\"}, value = "
+ + "{\"name\": \"bar" + i + "\"})");
+ }
+ } finally {
+ dumpFunctionLogs(sinkName);
+ }
+
+ deleteSink(sinkName);
+ getSinkInfoNotFound(sinkName);
+ }
+
+
+ private void submitPackage(String packageName, String description, String packagePath) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "packages", "upload",
+ packageName,
+ "--description", description,
+ "--path", packagePath
+
+ };
+ log.info("Run command : {}", StringUtils.join(commands, ' '));
+ ContainerExecResult result = container.execCmd(commands);
+ assertTrue(
+ result.getStdout().contains("successfully"),
+ result.getStdout());
+ }
+
+ private void submitSinkConnector(String sinkName,
+ String inputTopicName,
+ String className,
+ String archive,
+ String configs,
+ String transformFunction,
+ String transformFunctionClassName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "sinks", "create",
+ "--name", sinkName,
+ "-i", inputTopicName,
+ "--archive", archive,
+ "--classname", className,
+ "--sink-config", configs,
+ "--transform-function", transformFunction,
+ "--transform-function-classname", transformFunctionClassName
+ };
+ log.info("Run command : {}", StringUtils.join(commands, ' '));
+ ContainerExecResult result = container.execCmd(commands);
+ assertTrue(
+ result.getStdout().contains("Created successfully"),
+ result.getStdout());
+ }
+
+ private void getSinkInfoSuccess(String sinkName) throws Exception {
+ ContainerExecResult result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sinks",
+ "get",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sinkName
+ );
+ assertTrue(result.getStdout().contains("\"name\": \"" + sinkName + "\""));
+ }
+
+ private void getSinkStatus(String sinkName) throws Exception {
+ ContainerExecResult result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sinks",
+ "status",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sinkName
+ );
+ log.info(result.getStdout());
+ log.info(result.getStderr());
+ assertTrue(result.getStdout().contains("\"running\" : true"));
+ }
+
+ private void deleteSink(String sinkName) throws Exception {
+ ContainerExecResult result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sinks",
+ "delete",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sinkName
+ );
+ assertTrue(result.getStdout().contains("successfully"));
+ result.assertNoStderr();
+ }
+
+ private void getSinkInfoNotFound(String sinkName) throws Exception {
+ try {
+ container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sinks",
+ "get",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sinkName);
+ fail("Command should have exited with non-zero");
+ } catch (ContainerExecException e) {
+ assertTrue(e.getResult().getStderr().contains(sinkName + " doesn't exist"));
+ }
+ }
+}
+