You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/04/01 16:52:42 UTC
[pulsar] branch master updated: Refactor and consolidate Utils
classes (#3952)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6f70043 Refactor and consolidate Utils classes (#3952)
6f70043 is described below
commit 6f70043183e2c3e7259ba515266a97caa711b321
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Mon Apr 1 11:52:36 2019 -0500
Refactor and consolidate Utils classes (#3952)
* Consolidating utils classes
* further refactoring
* adding missing class
* add license header
---
.../org/apache/pulsar/broker/PulsarService.java | 4 +-
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 100 +++++++--------
.../apache/pulsar/io/PulsarFunctionTlsTest.java | 4 +-
.../apache/pulsar/admin/cli/CmdFunctionsTest.java | 4 +-
.../org/apache/pulsar/admin/cli/TestCmdSinks.java | 4 +-
.../apache/pulsar/admin/cli/TestCmdSources.java | 3 +-
.../pulsar/functions/instance/ContextImpl.java | 11 +-
.../pulsar/functions/instance/InstanceUtils.java | 16 +--
.../functions/instance/JavaInstanceRunnable.java | 20 ++-
.../apache/pulsar/functions/instance/Utils.java | 57 ---------
.../instance/stats/ComponentStatsManager.java | 4 +-
.../pulsar/functions/source/PulsarRecord.java | 4 +-
.../pulsar/functions/instance/ContextImplTest.java | 5 +-
.../pulsar/functions/instance/UtilsTest.java | 55 ---------
.../auth/KubernetesSecretsTokenAuthProvider.java | 4 +-
.../functions/runtime/KubernetesRuntime.java | 14 +--
.../pulsar/functions/runtime/LocalRunner.java | 4 +-
.../pulsar/functions/runtime/ProcessRuntime.java | 6 +-
.../pulsar/functions/runtime/RuntimeSpawner.java | 4 +-
.../pulsar/functions/runtime/RuntimeUtils.java | 9 +-
.../pulsar/functions/runtime/ThreadRuntime.java | 4 +-
.../functions/runtime/KubernetesRuntimeTest.java | 4 +-
.../functions/runtime/ProcessRuntimeTest.java | 4 +-
.../{package-info.java => ComponentType.java} | 21 +++-
.../utils/{Utils.java => FunctionCommon.java} | 89 +++++++++++---
.../functions/utils/FunctionConfigUtils.java | 18 +--
.../functions/utils/FunctionDetailsUtils.java | 71 -----------
.../apache/pulsar/functions/utils/Reflections.java | 6 +-
.../pulsar/functions/utils/SinkConfigUtils.java | 12 +-
.../pulsar/functions/utils/SourceConfigUtils.java | 12 +-
.../apache/pulsar/functions/utils/StateUtils.java | 40 ------
.../pulsar/functions/utils/ValidatorUtils.java | 6 +-
.../utils/functioncache/package-info.java | 23 ----
.../{UtilsTest.java => FunctionCommonTest.java} | 52 +++++++-
.../pulsar/functions/worker/FunctionActioner.java | 42 +++++--
.../functions/worker/FunctionRuntimeManager.java | 25 ++--
.../pulsar/functions/worker/MembershipManager.java | 13 +-
.../pulsar/functions/worker/SchedulerManager.java | 9 +-
.../org/apache/pulsar/functions/worker/Worker.java | 4 +-
.../pulsar/functions/worker/WorkerService.java | 15 +--
.../worker/{Utils.java => WorkerUtils.java} | 48 +++-----
.../functions/worker/rest/api/ComponentImpl.java | 39 +++---
.../functions/worker/rest/api/FunctionsImpl.java | 4 +-
.../functions/worker/rest/api/FunctionsImplV2.java | 7 +-
.../pulsar/functions/worker/rest/api/SinkImpl.java | 4 +-
.../functions/worker/rest/api/SourceImpl.java | 4 +-
.../functions/worker/rest/api/WorkerImpl.java | 9 +-
.../worker/FunctionRuntimeManagerTest.java | 10 +-
.../functions/worker/SchedulerManagerTest.java | 36 +++---
.../apache/pulsar/functions/worker/UtilsTest.java | 43 -------
.../worker/request/ServiceRequestManagerTest.java | 4 +-
.../worker/rest/api/FunctionsImplTest.java | 8 +-
.../rest/api/v2/FunctionApiV2ResourceTest.java | 92 +++++++-------
.../rest/api/v3/FunctionApiV3ResourceTest.java | 94 +++++++--------
.../worker/rest/api/v3/SinkApiV3ResourceTest.java | 134 +++++++++++----------
.../rest/api/v3/SourceApiV3ResourceTest.java | 134 +++++++++++----------
56 files changed, 639 insertions(+), 833 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 400d207..9cd6be8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -94,7 +94,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.TwoPhaseCompactor;
-import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils;
@@ -1036,7 +1036,7 @@ public class PulsarService implements AutoCloseable {
URI dlogURI;
try {
// initializing dlog namespace for function worker
- dlogURI = Utils.initializeDlogNamespace(
+ dlogURI = WorkerUtils.initializeDlogNamespace(
internalConf.getZookeeperServers(),
internalConf.getLedgersRootPath());
} catch (IOException ioe) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index abd9684..cd14acd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -51,7 +51,7 @@ import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.functions.instance.InstanceUtils;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
@@ -504,63 +504,63 @@ public class PulsarFunctionE2ETest {
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_received_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_written_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_written_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_sink_exceptions_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_system_exceptions_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_system_exceptions_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_last_invocation");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
int totalMsgs = 10;
@@ -587,63 +587,63 @@ public class PulsarFunctionE2ETest {
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, (double) totalMsgs);
m = metrics.get("pulsar_sink_received_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, (double) totalMsgs);
m = metrics.get("pulsar_sink_written_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, (double) totalMsgs);
m = metrics.get("pulsar_sink_written_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, (double) totalMsgs);
m = metrics.get("pulsar_sink_sink_exceptions_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_system_exceptions_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_system_exceptions_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_last_invocation");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertTrue(m.value > 0.0);
@@ -714,63 +714,63 @@ public class PulsarFunctionE2ETest {
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertTrue(m.value > 0.0);
m = metrics.get("pulsar_source_received_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertTrue(m.value > 0.0);
m = metrics.get("pulsar_source_written_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertTrue(m.value > 0.0);
m = metrics.get("pulsar_source_written_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertTrue(m.value > 0.0);
m = metrics.get("pulsar_source_source_exceptions_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_source_source_exceptions_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_source_system_exceptions_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_source_system_exceptions_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_source_last_invocation");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertTrue(m.value > 0.0);
}
@@ -870,77 +870,77 @@ public class PulsarFunctionE2ETest {
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_function_received_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_function_user_exceptions_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_function_user_exceptions_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_function_process_latency_ms");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, Double.NaN);
m = metrics.get("pulsar_function_process_latency_ms_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, Double.NaN);
m = metrics.get("pulsar_function_system_exceptions_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_function_system_exceptions_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_function_last_invocation");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_function_processed_successfully_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_function_processed_successfully_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
@@ -1028,77 +1028,77 @@ public class PulsarFunctionE2ETest {
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, (double) totalMsgs);
m = metrics.get("pulsar_function_received_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, (double) totalMsgs);
m = metrics.get("pulsar_function_user_exceptions_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_function_user_exceptions_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_function_process_latency_ms");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertTrue(m.value > 0.0);
m = metrics.get("pulsar_function_process_latency_ms_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertTrue(m.value > 0.0);
m = metrics.get("pulsar_function_system_exceptions_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_function_system_exceptions_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_function_last_invocation");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertTrue(m.value > 0.0);
m = metrics.get("pulsar_function_processed_successfully_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, (double) totalMsgs);
m = metrics.get("pulsar_function_processed_successfully_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, (double) totalMsgs);
// delete functions
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index a37786f..aba83a7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -45,7 +45,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
@@ -226,7 +226,7 @@ public class PulsarFunctionTlsTest {
File file = new File(jarFile);
try {
- Utils.loadJar(file);
+ FunctionCommon.loadJar(file);
} catch (MalformedURLException e) {
throw new RuntimeException("Failed to load user jar " + file, e);
}
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index a64c38c..4df3312 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -58,7 +58,7 @@ import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.IObjectFactory;
@@ -70,7 +70,7 @@ import org.testng.annotations.Test;
* Unit test of {@link CmdFunctions}.
*/
@Slf4j
-@PrepareForTest({ CmdFunctions.class, Reflections.class, StorageClientBuilder.class, Utils.class})
+@PrepareForTest({ CmdFunctions.class, Reflections.class, StorageClientBuilder.class, FunctionCommon.class})
@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" })
public class CmdFunctionsTest {
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index 2bb3877..5d8a303 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -43,7 +43,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.io.SinkConfig;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -120,7 +120,7 @@ public class TestCmdSinks {
throw new RuntimeException("Failed to file required test archive: " + JAR_FILE_NAME);
}
JAR_FILE_PATH = file.getFile();
- Thread.currentThread().setContextClassLoader(Utils.loadJar(new File(JAR_FILE_PATH)));
+ Thread.currentThread().setContextClassLoader(FunctionCommon.loadJar(new File(JAR_FILE_PATH)));
}
public SinkConfig getSinkConfig() {
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 77e26a5..ac2dab9 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -42,7 +42,6 @@ import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.functions.utils.*;
-import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -101,7 +100,7 @@ public class TestCmdSources {
mockStatic(CmdFunctions.class);
PowerMockito.doNothing().when(localSourceRunner).runCmd();
JAR_FILE_PATH = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME).getFile();
- Thread.currentThread().setContextClassLoader(Utils.loadJar(new File(JAR_FILE_PATH)));
+ Thread.currentThread().setContextClassLoader(FunctionCommon.loadJar(new File(JAR_FILE_PATH)));
}
public SourceConfig getSourceConfig() {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 5400b41..bb70b41 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -44,8 +44,8 @@ import org.apache.pulsar.functions.instance.stats.SourceStatsManager;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.source.TopicSchema;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.ComponentType;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
@@ -54,7 +54,6 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -98,11 +97,11 @@ class ContextImpl implements Context, SinkContext, SourceContext {
userMetricsLabelNames = Arrays.copyOf(ComponentStatsManager.metricsLabelNames, ComponentStatsManager.metricsLabelNames.length + 1);
userMetricsLabelNames[ComponentStatsManager.metricsLabelNames.length] = "metric";
}
- private final Utils.ComponentType componentType;
+ private final ComponentType componentType;
public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels,
- Utils.ComponentType componentType, ComponentStatsManager statsManager) {
+ ComponentType componentType, ComponentStatsManager statsManager) {
this.config = config;
this.logger = logger;
this.publishProducers = new HashMap<>();
@@ -335,7 +334,7 @@ class ContextImpl implements Context, SinkContext, SourceContext {
.sendTimeout(0, TimeUnit.SECONDS)
.topic(topicName)
.properties(InstanceUtils.getProperties(componentType,
- FunctionDetailsUtils.getFullyQualifiedName(
+ FunctionCommon.getFullyQualifiedName(
this.config.getFunctionDetails().getTenant(),
this.config.getFunctionDetails().getNamespace(),
this.config.getFunctionDetails().getName()),
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index 9db47cf..5cae6a8 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -20,9 +20,9 @@ package org.apache.pulsar.functions.instance;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isEmpty;
-import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION;
-import static org.apache.pulsar.functions.utils.Utils.ComponentType.SINK;
-import static org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE;
+import static org.apache.pulsar.functions.utils.ComponentType.FUNCTION;
+import static org.apache.pulsar.functions.utils.ComponentType.SINK;
+import static org.apache.pulsar.functions.utils.ComponentType.SOURCE;
import lombok.experimental.UtilityClass;
@@ -31,11 +31,11 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.sink.PulsarSink;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.ComponentType;
import org.apache.pulsar.functions.utils.Reflections;
import net.jodah.typetools.TypeResolver;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import java.util.HashMap;
import java.util.Map;
@@ -89,7 +89,7 @@ public class InstanceUtils {
}
}
- public Utils.ComponentType calculateSubjectType(Function.FunctionDetails functionDetails) {
+ public ComponentType calculateSubjectType(Function.FunctionDetails functionDetails) {
Function.SourceSpec sourceSpec = functionDetails.getSource();
Function.SinkSpec sinkSpec = functionDetails.getSink();
if (sourceSpec.getInputSpecsCount() == 0) {
@@ -109,7 +109,7 @@ public class InstanceUtils {
}
public static String getDefaultSubscriptionName(String tenant, String namespace, String name) {
- return FunctionDetailsUtils.getFullyQualifiedName(tenant, namespace, name);
+ return FunctionCommon.getFullyQualifiedName(tenant, namespace, name);
}
public static String getDefaultSubscriptionName(Function.FunctionDetails functionDetails) {
@@ -119,7 +119,7 @@ public class InstanceUtils {
functionDetails.getName());
}
- public static Map<String, String> getProperties(Utils.ComponentType componentType,
+ public static Map<String, String> getProperties(ComponentType componentType,
String fullyQualifiedName, int instanceId) {
Map<String, String> properties = new HashMap<>();
switch (componentType) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 8a60537..6b265ae 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -66,10 +66,9 @@ import org.apache.pulsar.functions.sink.PulsarSinkConfig;
import org.apache.pulsar.functions.sink.PulsarSinkDisable;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.ComponentType;
import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.StateUtils;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
@@ -80,7 +79,6 @@ import java.io.FileNotFoundException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -131,7 +129,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private InstanceCache instanceCache;
- private final Utils.ComponentType componentType;
+ private final ComponentType componentType;
private final Map<String, String> properties;
@@ -156,13 +154,13 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
instanceConfig.getFunctionDetails().getName(),
String.valueOf(instanceConfig.getInstanceId()),
instanceConfig.getClusterName(),
- FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails())
+ FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails())
};
this.componentType = InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails());
this.properties = InstanceUtils.getProperties(this.componentType,
- FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()),
+ FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()),
this.instanceConfig.getInstanceId());
// Declare function local collector registry so that it will not clash with other function instances'
@@ -176,7 +174,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
*/
JavaInstance setupJavaInstance(ContextImpl contextImpl) throws Exception {
// initialize the thread context
- ThreadContext.put("function", FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
+ ThreadContext.put("function", FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
ThreadContext.put("functionname", instanceConfig.getFunctionDetails().getName());
ThreadContext.put("instance", instanceConfig.getInstanceName());
@@ -276,7 +274,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
}
} catch (Throwable t) {
- log.error("[{}] Uncaught exception in Java Instance", Utils.getFullyQualifiedInstanceId(
+ log.error("[{}] Uncaught exception in Java Instance", FunctionCommon.getFullyQualifiedInstanceId(
instanceConfig.getFunctionDetails().getTenant(),
instanceConfig.getFunctionDetails().getNamespace(),
instanceConfig.getFunctionDetails().getName(),
@@ -366,7 +364,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
return;
}
- String tableNs = StateUtils.getStateNamespace(
+ String tableNs = FunctionCommon.getStateNamespace(
instanceConfig.getFunctionDetails().getTenant(),
instanceConfig.getFunctionDetails().getNamespace()
);
@@ -586,7 +584,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
if (instanceConfig.getFunctionDetails().getLogTopic() != null &&
!instanceConfig.getFunctionDetails().getLogTopic().isEmpty()) {
logAppender = new LogAppender(client, instanceConfig.getFunctionDetails().getLogTopic(),
- FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
+ FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
logAppender.start();
}
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/Utils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/Utils.java
deleted file mode 100644
index 7149bfe..0000000
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/Utils.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance;
-
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.impl.TopicMessageIdImpl;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * Utils used for instance.
- */
-@Slf4j
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public class Utils {
-
- public static final long getSequenceId(MessageId messageId) {
- MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof TopicMessageIdImpl)
- ? ((TopicMessageIdImpl) messageId).getInnerMessageId()
- : messageId);
- long ledgerId = msgId.getLedgerId();
- long entryId = msgId.getEntryId();
-
- // Combine ledger id and entry id to form offset
- // Use less than 32 bits to represent entry id since it will get
- // rolled over way before overflowing the max int range
- long offset = (ledgerId << 28) | entryId;
- return offset;
- }
-
- public static final MessageId getMessageId(long sequenceId) {
- // Demultiplex ledgerId and entryId from offset
- long ledgerId = sequenceId >>> 28;
- long entryId = sequenceId & 0x0F_FF_FF_FFL;
-
- return new MessageIdImpl(ledgerId, entryId, -1);
- }
-}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
index 0c9b9af..081556c 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
@@ -23,7 +23,7 @@ import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.common.TextFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.ComponentType;
import java.io.IOException;
import java.io.StringWriter;
@@ -58,7 +58,7 @@ public abstract class ComponentStatsManager implements AutoCloseable {
public static ComponentStatsManager getStatsManager(CollectorRegistry collectorRegistry,
String[] metricsLabels,
ScheduledExecutorService scheduledExecutorService,
- Utils.ComponentType componentType) {
+ ComponentType componentType) {
switch (componentType) {
case FUNCTION:
return new FunctionStatsManager(collectorRegistry, metricsLabels, scheduledExecutorService);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index dc5a08a..c03ed9f 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -29,7 +29,7 @@ import lombok.ToString;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.EncryptionContext;
-import org.apache.pulsar.functions.instance.Utils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
@Builder
@Getter
@@ -66,7 +66,7 @@ public class PulsarRecord<T> implements RecordWithEncryptionContext<T> {
@Override
public Optional<Long> getRecordSequence() {
- return Optional.of(Utils.getSequenceId(message.getMessageId()));
+ return Optional.of(FunctionCommon.getSequenceId(message.getMessageId()));
}
@Override
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index df243cf..a898e54 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -27,14 +27,13 @@ import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.ComponentType;
import org.mockito.Matchers;
import org.slf4j.Logger;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -78,7 +77,7 @@ public class ContextImplTest {
logger,
client,
new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
- Utils.ComponentType.FUNCTION, null);
+ ComponentType.FUNCTION, null);
}
@Test(expectedExceptions = IllegalStateException.class)
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/UtilsTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/UtilsTest.java
deleted file mode 100644
index ba83c89..0000000
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/UtilsTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-
-import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.testng.annotations.Test;
-
-/**
- * Unit test of {@link Utils}.
- */
-public class UtilsTest {
-
- @Test
- public void testGetSequenceId() {
- long lid = 12345L;
- long eid = 34566L;
- MessageIdImpl id = mock(MessageIdImpl.class);
- when(id.getLedgerId()).thenReturn(lid);
- when(id.getEntryId()).thenReturn(eid);
-
- assertEquals((lid << 28) | eid, Utils.getSequenceId(id));
- }
-
- @Test
- public void testGetMessageId() {
- long lid = 12345L;
- long eid = 34566L;
- long sequenceId = (lid << 28) | eid;
-
- MessageIdImpl id = (MessageIdImpl) Utils.getMessageId(sequenceId);
- assertEquals(lid, id.getLedgerId());
- assertEquals(eid, id.getEntryId());
- }
-
-}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
index ff2cd57..e1cd3e8 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
@@ -34,7 +34,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.utils.Actions;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import java.util.Collections;
import java.util.Optional;
@@ -112,7 +112,7 @@ public class KubernetesSecretsTokenAuthProvider implements KubernetesFunctionAut
@Override
public void cleanUpAuthData(String tenant, String namespace, String name, FunctionAuthData functionAuthData) throws Exception {
- String fqfn = FunctionDetailsUtils.getFullyQualifiedName(tenant, namespace, name);
+ String fqfn = FunctionCommon.getFullyQualifiedName(tenant, namespace, name);
String secretName = new String(functionAuthData.getData());
Actions.Action deleteSecrets = Actions.Action.builder()
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index fb51b36..103980f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -62,8 +62,8 @@ import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.Actions;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.ComponentType;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import java.io.IOException;
import java.util.ArrayList;
@@ -370,7 +370,7 @@ public class KubernetesRuntime implements Runtime {
final V1Service service = createService();
log.info("Submitting the following service to k8 {}", coreClient.getApiClient().getJSON().serialize(service));
- String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());
+ String fqfn = FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails());
Actions.Action createService = Actions.Action.builder()
.actionName(String.format("Submitting service for function %s", fqfn))
@@ -447,7 +447,7 @@ public class KubernetesRuntime implements Runtime {
log.info("Submitting the following spec to k8 {}", appsClient.getApiClient().getJSON().serialize(statefulSet));
- String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());
+ String fqfn = FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails());
Actions.Action createStatefulSet = Actions.Action.builder()
.actionName(String.format("Submitting statefulset for function %s", fqfn))
@@ -495,7 +495,7 @@ public class KubernetesRuntime implements Runtime {
options.setGracePeriodSeconds(5L);
options.setPropagationPolicy("Foreground");
- String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());
+ String fqfn = FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails());
Actions.Action deleteStatefulSet = Actions.Action.builder()
.actionName(String.format("Deleting statefulset for function %s", fqfn))
.numRetries(KubernetesRuntimeFactory.NUM_RETRIES)
@@ -644,7 +644,7 @@ public class KubernetesRuntime implements Runtime {
final V1DeleteOptions options = new V1DeleteOptions();
options.setGracePeriodSeconds(0L);
options.setPropagationPolicy("Foreground");
- String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());
+ String fqfn = FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails());
String serviceName = createJobName(instanceConfig.getFunctionDetails());
Actions.Action deleteService = Actions.Action.builder()
@@ -846,7 +846,7 @@ public class KubernetesRuntime implements Runtime {
private Map<String, String> getLabels(Function.FunctionDetails functionDetails) {
final Map<String, String> labels = new HashMap<>();
- Utils.ComponentType componentType = InstanceUtils.calculateSubjectType(functionDetails);
+ ComponentType componentType = InstanceUtils.calculateSubjectType(functionDetails);
String component;
switch (componentType) {
case FUNCTION:
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
index 717659e..98950e7 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
@@ -47,7 +47,7 @@ import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.utils.io.Connectors;
import static org.apache.pulsar.common.functions.Utils.inferMissingArguments;
-import static org.apache.pulsar.functions.utils.Utils.*;
+import static org.apache.pulsar.functions.utils.FunctionCommon.*;
@Slf4j
public class LocalRunner {
@@ -187,7 +187,7 @@ public class LocalRunner {
instanceConfig.setFunctionId(UUID.randomUUID().toString());
instanceConfig.setInstanceId(i + instanceIdOffset);
instanceConfig.setMaxBufferedTuples(1024);
- instanceConfig.setPort(Utils.findAvailablePort());
+ instanceConfig.setPort(FunctionCommon.findAvailablePort());
instanceConfig.setClusterName("local");
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
instanceConfig,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 734925a..c202ad9 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -37,7 +37,7 @@ import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import java.io.IOException;
import java.io.InputStream;
@@ -85,7 +85,7 @@ class ProcessRuntime implements Runtime {
Long expectedHealthCheckInterval) throws Exception {
this.instanceConfig = instanceConfig;
this.instancePort = instanceConfig.getPort();
- this.metricsPort = Utils.findAvailablePort();
+ this.metricsPort = FunctionCommon.findAvailablePort();
this.expectedHealthCheckInterval = expectedHealthCheckInterval;
this.secretsProviderConfigurator = secretsProviderConfigurator;
this.funcLogDir = RuntimeUtils.genFunctionLogFolder(logDirectory, instanceConfig);
@@ -201,7 +201,7 @@ class ProcessRuntime implements Runtime {
// forcibly kill after timeout
if (process.isAlive()) {
log.warn("Process for instance {} did not exit within timeout. Forcibly killing process...",
- Utils.getFullyQualifiedInstanceId(
+ FunctionCommon.getFullyQualifiedInstanceId(
instanceConfig.getFunctionDetails().getTenant(),
instanceConfig.getFunctionDetails().getNamespace(),
instanceConfig.getFunctionDetails().getName(), instanceConfig.getInstanceId()));
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index 5bb3ab1..206745a 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -36,7 +36,7 @@ import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
@Slf4j
public class RuntimeSpawner implements AutoCloseable {
@@ -127,7 +127,7 @@ public class RuntimeSpawner implements AutoCloseable {
public CompletableFuture<String> getFunctionStatusAsJson(int instanceId) {
return this.getFunctionStatus(instanceId).thenApply(msg -> {
try {
- return Utils.printJson(msg);
+ return FunctionCommon.printJson(msg);
} catch (IOException e) {
throw new RuntimeException(
instanceConfig.getFunctionDetails().getName() + " Exception parsing getStatus", e);
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index b866f46..8a5ab37 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -27,22 +27,17 @@ import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.URL;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.function.Supplier;
-import lombok.Builder;
-import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
-import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -247,7 +242,7 @@ public class RuntimeUtils {
return String.format(
"%s/%s",
logDirectory,
- FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
+ FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
}
public static String getPrometheusMetrics(int metricsPort) throws IOException{
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 8aff81a..83e3782 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -31,9 +31,9 @@ import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
/**
* A function container implemented using java thread.
@@ -104,7 +104,7 @@ class ThreadRuntime implements Runtime {
log.info("ThreadContainer starting function with instance config {}", instanceConfig);
this.fnThread = new Thread(threadGroup, javaInstanceRunnable,
String.format("%s-%s",
- FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()),
+ FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()),
instanceConfig.getInstanceId()));
this.fnThread.start();
}
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index c7a9832..b1c3dfb 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -31,7 +31,7 @@ import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
@@ -291,7 +291,7 @@ public class KubernetesRuntimeTest {
+ " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
+ extraDepsEnv
+ " -Dlog4j.configurationFile=kubernetes_instance_log4j2.yml "
- + "-Dpulsar.function.log.dir=" + logDirectory + "/" + FunctionDetailsUtils.getFullyQualifiedName(config.getFunctionDetails())
+ + "-Dpulsar.function.log.dir=" + logDirectory + "/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-$SHARD_ID"
+ " -Xmx" + String.valueOf(RESOURCES.getRam())
+ " org.apache.pulsar.functions.runtime.JavaInstanceMain"
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 5460fe3..e2c0499 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -44,7 +44,7 @@ import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
@@ -269,7 +269,7 @@ public class ProcessRuntimeTest {
+ " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
+ extraDepsEnv
+ " -Dlog4j.configurationFile=java_instance_log4j2.yml "
- + "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" + FunctionDetailsUtils.getFullyQualifiedName(config.getFunctionDetails())
+ + "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-" + config.getInstanceId()
+ " org.apache.pulsar.functions.runtime.JavaInstanceMain"
+ " --jar " + userJarFile + " --instance_id "
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/package-info.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ComponentType.java
similarity index 69%
rename from pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/package-info.java
rename to pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ComponentType.java
index 7cf7893..a99edc2 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/package-info.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ComponentType.java
@@ -16,8 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.pulsar.functions.utils;
-/**
- * Helper classes.
- */
-package org.apache.pulsar.functions.utils;
\ No newline at end of file
+public enum ComponentType {
+ FUNCTION("Function"),
+ SOURCE("Source"),
+ SINK("Sink");
+
+ private final String componentName;
+
+ ComponentType(String componentName) {
+ this.componentName = componentName;
+ }
+
+ @Override
+ public String toString() {
+ return componentName;
+ }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
similarity index 81%
rename from pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
rename to pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 38002a1..2212734 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -18,9 +18,11 @@
*/
package org.apache.pulsar.functions.utils;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.ObjectOutputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.ParameterizedType;
@@ -31,7 +33,11 @@ import java.nio.channels.ReadableByteChannel;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
+import java.util.UUID;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.functions.api.Function;
@@ -56,7 +62,7 @@ import static org.apache.commons.lang3.StringUtils.isEmpty;
*/
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public class Utils {
+public class FunctionCommon {
public static String printJson(MessageOrBuilder msg) throws IOException {
return JsonFormat.printer().print(msg);
@@ -235,6 +241,16 @@ public class Utils {
return null;
}
+ public static void downloadFromHttpUrl(String destPkgUrl, File targetFile) throws IOException {
+ URL website = new URL(destPkgUrl);
+ ReadableByteChannel rbc = Channels.newChannel(website.openStream());
+ log.info("Downloading function package from {} to {} ...", destPkgUrl, targetFile.getAbsoluteFile());
+ try (FileOutputStream fos = new FileOutputStream(targetFile)) {
+ fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
+ }
+ log.info("Downloading function package from {} to {} completed!", destPkgUrl, targetFile.getAbsoluteFile());
+ }
+
/**
* Load a jar.
*
@@ -266,14 +282,8 @@ public class Utils {
}
return file;
} else if (destPkgUrl.startsWith("http")) {
- URL website = new URL(destPkgUrl);
File tempFile = File.createTempFile("function", ".tmp");
- ReadableByteChannel rbc = Channels.newChannel(website.openStream());
- log.info("Downloading function package from {} to {} ...", destPkgUrl, tempFile.getAbsoluteFile());
- try (FileOutputStream fos = new FileOutputStream(tempFile)) {
- fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
- }
- log.info("Downloading function package from {} to {} completed!", destPkgUrl, tempFile.getAbsoluteFile());
+ downloadFromHttpUrl(destPkgUrl, tempFile);
return tempFile;
} else {
throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]");
@@ -351,20 +361,61 @@ public class Utils {
return String.format("%s/%s/%s:%d", tenant, namespace, functionName, instanceId);
}
- public enum ComponentType {
- FUNCTION("Function"),
- SOURCE("Source"),
- SINK("Sink");
+ public static final long getSequenceId(MessageId messageId) {
+ MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof TopicMessageIdImpl)
+ ? ((TopicMessageIdImpl) messageId).getInnerMessageId()
+ : messageId);
+ long ledgerId = msgId.getLedgerId();
+ long entryId = msgId.getEntryId();
+
+ // Combine ledger id and entry id to form offset
+ // Use less than 32 bits to represent entry id since it will get
+ // rolled over way before overflowing the max int range
+ long offset = (ledgerId << 28) | entryId;
+ return offset;
+ }
+
+ public static final MessageId getMessageId(long sequenceId) {
+ // Demultiplex ledgerId and entryId from offset
+ long ledgerId = sequenceId >>> 28;
+ long entryId = sequenceId & 0x0F_FF_FF_FFL;
- private final String componentName;
+ return new MessageIdImpl(ledgerId, entryId, -1);
+ }
- ComponentType(String componentName) {
- this.componentName = componentName;
+ public static byte[] toByteArray(Object obj) throws IOException {
+ byte[] bytes = null;
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+ oos.writeObject(obj);
+ oos.flush();
+ bytes = bos.toByteArray();
}
+ return bytes;
+ }
- @Override
- public String toString() {
- return componentName;
- }
+ public static String getUniquePackageName(String packageName) {
+ return String.format("%s-%s", UUID.randomUUID().toString(), packageName);
+ }
+
+ /**
+ * Convert pulsar tenant and namespace to state storage namespace.
+ *
+ * @param tenant pulsar tenant
+ * @param namespace pulsar namespace
+ * @return state storage namespace
+ */
+ public static String getStateNamespace(String tenant, String namespace) {
+ return String.format("%s_%s", tenant, namespace)
+ .replace("-", "_");
+ }
+
+ public static String getFullyQualifiedName(org.apache.pulsar.functions.proto.Function.FunctionDetails FunctionDetails) {
+ return getFullyQualifiedName(FunctionDetails.getTenant(), FunctionDetails.getNamespace(), FunctionDetails.getName());
+
+ }
+
+ public static String getFullyQualifiedName(String tenant, String namespace, String functionName) {
+ return String.format("%s/%s/%s", tenant, namespace, functionName);
}
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 1a41eb5..aeb26c3 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -39,7 +39,7 @@ import static org.apache.commons.lang.StringUtils.isNotBlank;
import static org.apache.commons.lang.StringUtils.isNotEmpty;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.pulsar.common.functions.Utils.BUILTIN;
-import static org.apache.pulsar.functions.utils.Utils.loadJar;
+import static org.apache.pulsar.functions.utils.FunctionCommon.loadJar;
public class FunctionConfigUtils {
public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader classLoader)
@@ -48,7 +48,7 @@ public class FunctionConfigUtils {
Class<?>[] typeArgs = null;
if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
if (classLoader != null) {
- typeArgs = Utils.getFunctionTypes(functionConfig, classLoader);
+ typeArgs = FunctionCommon.getFunctionTypes(functionConfig, classLoader);
}
}
@@ -159,11 +159,11 @@ public class FunctionConfigUtils {
functionDetailsBuilder.setLogTopic(functionConfig.getLogTopic());
}
if (functionConfig.getRuntime() != null) {
- functionDetailsBuilder.setRuntime(Utils.convertRuntime(functionConfig.getRuntime()));
+ functionDetailsBuilder.setRuntime(FunctionCommon.convertRuntime(functionConfig.getRuntime()));
}
if (functionConfig.getProcessingGuarantees() != null) {
functionDetailsBuilder.setProcessingGuarantees(
- Utils.convertProcessingGuarantee(functionConfig.getProcessingGuarantees()));
+ FunctionCommon.convertProcessingGuarantee(functionConfig.getProcessingGuarantees()));
}
if (functionConfig.getMaxMessageRetries() != null && functionConfig.getMaxMessageRetries() >= 0) {
@@ -234,7 +234,7 @@ public class FunctionConfigUtils {
functionConfig.setNamespace(functionDetails.getNamespace());
functionConfig.setName(functionDetails.getName());
functionConfig.setParallelism(functionDetails.getParallelism());
- functionConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
+ functionConfig.setProcessingGuarantees(FunctionCommon.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
Map<String, ConsumerConfig> consumerConfigMap = new HashMap<>();
for (Map.Entry<String, Function.ConsumerSpec> input : functionDetails.getSource().getInputSpecsMap().entrySet()) {
ConsumerConfig consumerConfig = new ConsumerConfig();
@@ -278,8 +278,8 @@ public class FunctionConfigUtils {
if (!isEmpty(functionDetails.getLogTopic())) {
functionConfig.setLogTopic(functionDetails.getLogTopic());
}
- functionConfig.setRuntime(Utils.convertRuntime(functionDetails.getRuntime()));
- functionConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
+ functionConfig.setRuntime(FunctionCommon.convertRuntime(functionDetails.getRuntime()));
+ functionConfig.setProcessingGuarantees(FunctionCommon.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
if (functionDetails.hasRetryDetails()) {
functionConfig.setMaxMessageRetries(functionDetails.getRetryDetails().getMaxMessageRetries());
if (!isEmpty(functionDetails.getRetryDetails().getDeadLetterTopic())) {
@@ -355,7 +355,7 @@ public class FunctionConfigUtils {
}
private static void doJavaChecks(FunctionConfig functionConfig, ClassLoader clsLoader) {
- Class<?>[] typeArgs = Utils.getFunctionTypes(functionConfig, clsLoader);
+ Class<?>[] typeArgs = FunctionCommon.getFunctionTypes(functionConfig, clsLoader);
// inputs use default schema, so there is no check needed there
// Check if the Input serialization/deserialization class exists in jar or already loaded and that it
@@ -566,7 +566,7 @@ public class FunctionConfigUtils {
ClassLoader classLoader = null;
if (org.apache.commons.lang3.StringUtils.isNotBlank(functionPkgUrl)) {
try {
- classLoader = Utils.extractClassLoader(functionPkgUrl);
+ classLoader = FunctionCommon.extractClassLoader(functionPkgUrl);
} catch (Exception e) {
throw new IllegalArgumentException("Corrupted Jar File", e);
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionDetailsUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionDetailsUtils.java
deleted file mode 100644
index a9787ba..0000000
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionDetailsUtils.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pulsar.functions.utils;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-
-public class FunctionDetailsUtils {
-
- public static String getFullyQualifiedName(FunctionDetails FunctionDetails) {
- return getFullyQualifiedName(FunctionDetails.getTenant(), FunctionDetails.getNamespace(), FunctionDetails.getName());
- }
-
- public static String getFullyQualifiedName(String tenant, String namespace, String functionName) {
- return String.format("%s/%s/%s", tenant, namespace, functionName);
- }
-
- public static String extractTenantFromFQN(String fullyQualifiedName) {
- return fullyQualifiedName.split("/")[0];
- }
-
- public static String extractNamespaceFromFQN(String fullyQualifiedName) {
- return fullyQualifiedName.split("/")[1];
- }
-
- public static String extractFunctionNameFromFQN(String fullyQualifiedName) {
- return fullyQualifiedName.split("/")[2];
- }
-
- public static String getDownloadFileName(FunctionDetails FunctionDetails,
- Function.PackageLocationMetaData packageLocation) {
- if (!StringUtils.isEmpty(packageLocation.getOriginalFileName())) {
- return packageLocation.getOriginalFileName();
- }
- String[] hierarchy = FunctionDetails.getClassName().split("\\.");
- String fileName;
- if (hierarchy.length <= 0) {
- fileName = FunctionDetails.getClassName();
- } else if (hierarchy.length == 1) {
- fileName = hierarchy[0];
- } else {
- fileName = hierarchy[hierarchy.length - 2];
- }
- switch (FunctionDetails.getRuntime()) {
- case JAVA:
- return fileName + ".jar";
- case PYTHON:
- return fileName + ".py";
- default:
- throw new RuntimeException("Unknown runtime " + FunctionDetails.getRuntime());
- }
- }
-}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
index 7ba22aa..1d89738 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
@@ -165,7 +165,7 @@ public class Reflections {
public static Object createInstance(String userClassName, java.io.File jar) {
try {
- return createInstance(userClassName, Utils.loadJar(jar));
+ return createInstance(userClassName, FunctionCommon.loadJar(jar));
} catch (Exception ex) {
return null;
}
@@ -180,7 +180,7 @@ public class Reflections {
*/
public static boolean classExistsInJar(java.io.File jar, String fqcn) {
try {
- java.net.URLClassLoader loader = (URLClassLoader) Utils.loadJar(jar);
+ java.net.URLClassLoader loader = (URLClassLoader) FunctionCommon.loadJar(jar);
Class.forName(fqcn, false, loader);
loader.close();
return true;
@@ -214,7 +214,7 @@ public class Reflections {
public static boolean classInJarImplementsIface(java.io.File jar, String fqcn, Class xface) {
boolean ret = false;
try {
- java.net.URLClassLoader loader = (URLClassLoader) Utils.loadJar(jar);
+ java.net.URLClassLoader loader = (URLClassLoader) FunctionCommon.loadJar(jar);
if (xface.isAssignableFrom(Class.forName(fqcn, false, loader))){
ret = true;
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 81554c4..72a9e6e 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -44,8 +44,8 @@ import java.util.*;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
-import static org.apache.pulsar.functions.utils.Utils.getSinkType;
+import static org.apache.pulsar.functions.utils.FunctionCommon.convertProcessingGuarantee;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;
@Slf4j
public class SinkConfigUtils {
@@ -204,7 +204,7 @@ public class SinkConfigUtils {
sinkConfig.setNamespace(functionDetails.getNamespace());
sinkConfig.setName(functionDetails.getName());
sinkConfig.setParallelism(functionDetails.getParallelism());
- sinkConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
+ sinkConfig.setProcessingGuarantees(FunctionCommon.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
Map<String, ConsumerConfig> consumerConfigMap = new HashMap<>();
for (Map.Entry<String, Function.ConsumerSpec> input : functionDetails.getSource().getInputSpecsMap().entrySet()) {
ConsumerConfig consumerConfig = new ConsumerConfig();
@@ -306,11 +306,11 @@ public class SinkConfigUtils {
ClassLoader jarClassLoader = null;
ClassLoader narClassLoader = null;
try {
- jarClassLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+ jarClassLoader = FunctionCommon.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
} catch (Exception e) {
}
try {
- narClassLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+ narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
} catch (Exception e) {
}
if (jarClassLoader == null && narClassLoader == null) {
@@ -332,7 +332,7 @@ public class SinkConfigUtils {
} else if (!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
throw new IllegalArgumentException("Class-name must be present for archive with file-url");
} else {
- classLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+ classLoader = FunctionCommon.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
if (classLoader == null) {
throw new IllegalArgumentException("Sink Package is not provided");
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 9a32085..9f71d63 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -41,8 +41,8 @@ import java.nio.file.Path;
import java.util.Map;
import static org.apache.commons.lang3.StringUtils.isEmpty;
-import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
-import static org.apache.pulsar.functions.utils.Utils.getSourceType;
+import static org.apache.pulsar.functions.utils.FunctionCommon.convertProcessingGuarantee;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType;
public class SourceConfigUtils {
@@ -146,7 +146,7 @@ public class SourceConfigUtils {
sourceConfig.setNamespace(functionDetails.getNamespace());
sourceConfig.setName(functionDetails.getName());
sourceConfig.setParallelism(functionDetails.getParallelism());
- sourceConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
+ sourceConfig.setProcessingGuarantees(FunctionCommon.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
Function.SourceSpec sourceSpec = functionDetails.getSource();
if (!StringUtils.isEmpty(sourceSpec.getClassName())) {
sourceConfig.setClassName(sourceSpec.getClassName());
@@ -217,11 +217,11 @@ public class SourceConfigUtils {
ClassLoader jarClassLoader = null;
ClassLoader narClassLoader = null;
try {
- jarClassLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+ jarClassLoader = FunctionCommon.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
} catch (Exception e) {
}
try {
- narClassLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+ narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
} catch (Exception e) {
}
if (jarClassLoader == null && narClassLoader == null) {
@@ -243,7 +243,7 @@ public class SourceConfigUtils {
} else if (!StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
throw new IllegalArgumentException("Class-name must be present for archive with file-url");
} else {
- classLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+ classLoader = FunctionCommon.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
if (classLoader == null) {
throw new IllegalArgumentException("Source Package is not provided");
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/StateUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/StateUtils.java
deleted file mode 100644
index 8eabf3a..0000000
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/StateUtils.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.utils;
-
-/**
- * Utils for state store.
- */
-public final class StateUtils {
-
- private StateUtils() {}
-
- /**
- * Convert pulsar tenant and namespace to state storage namespace.
- *
- * @param tenant pulsar tenant
- * @param namespace pulsar namespace
- * @return state storage namespace
- */
- public static String getStateNamespace(String tenant, String namespace) {
- return String.format("%s_%s", tenant, namespace)
- .replace("-", "_");
- }
-
-}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
index 72051e5..93a6a72 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
@@ -35,7 +35,7 @@ public class ValidatorUtils {
// If it's empty, we use the default schema and no need to validate
// If it's built-in, no need to validate
} else {
- Utils.implementsClass(schemaType, Schema.class, clsLoader);
+ FunctionCommon.implementsClass(schemaType, Schema.class, clsLoader);
validateSchemaType(schemaType, typeArg, clsLoader, input);
}
}
@@ -54,13 +54,13 @@ public class ValidatorUtils {
if (isEmpty(inputSerializer)) return;
if (inputSerializer.equals(DEFAULT_SERDE)) return;
try {
- Class<?> serdeClass = Utils.loadClass(inputSerializer, clsLoader);
+ Class<?> serdeClass = FunctionCommon.loadClass(inputSerializer, clsLoader);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(
String.format("The input serialization/deserialization class %s does not exist",
inputSerializer));
}
- Utils.implementsClass(inputSerializer, SerDe.class, clsLoader);
+ FunctionCommon.implementsClass(inputSerializer, SerDe.class, clsLoader);
SerDe serDe = (SerDe) Reflections.createInstance(inputSerializer, clsLoader);
if (serDe == null) {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/package-info.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/package-info.java
deleted file mode 100644
index eef11b3..0000000
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * A cache system for managing function library dependencies.
- */
-package org.apache.pulsar.functions.utils.functioncache;
\ No newline at end of file
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/UtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java
similarity index 55%
rename from pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/UtilsTest.java
rename to pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java
index 72dd26e..2fbb6c0 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/UtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java
@@ -19,46 +19,86 @@
package org.apache.pulsar.functions.utils;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import org.testng.annotations.Test;
+import java.io.File;
+import java.util.UUID;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
/**
* Unit test of {@link Exceptions}.
*/
-public class UtilsTest {
+public class FunctionCommonTest {
@Test
public void testValidateLocalFileUrl() throws Exception {
String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
try {
// eg: fileLocation : /dir/fileName.jar (invalid)
- Utils.extractClassLoader(fileLocation);
+ FunctionCommon.extractClassLoader(fileLocation);
Assert.fail("should fail with invalid url: without protocol");
} catch (IllegalArgumentException ie) {
// Ok.. expected exception
}
String fileLocationWithProtocol = "file://" + fileLocation;
// eg: fileLocation : file:///dir/fileName.jar (valid)
- Utils.extractClassLoader(fileLocationWithProtocol);
+ FunctionCommon.extractClassLoader(fileLocationWithProtocol);
// eg: fileLocation : file:/dir/fileName.jar (valid)
fileLocationWithProtocol = "file:" + fileLocation;
- Utils.extractClassLoader(fileLocationWithProtocol);
+ FunctionCommon.extractClassLoader(fileLocationWithProtocol);
}
@Test
public void testValidateHttpFileUrl() throws Exception {
String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";
- Utils.extractClassLoader(jarHttpUrl);
+ FunctionCommon.extractClassLoader(jarHttpUrl);
jarHttpUrl = "http://_invalidurl_.com";
try {
// eg: fileLocation : /dir/fileName.jar (invalid)
- Utils.extractClassLoader(jarHttpUrl);
+ FunctionCommon.extractClassLoader(jarHttpUrl);
Assert.fail("should fail with invalid url: without protocol");
} catch (Exception ie) {
// Ok.. expected exception
}
}
+
+ @Test
+ public void testDownloadFile() throws Exception {
+ String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";
+ String testDir = FunctionCommonTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+ File pkgFile = new File(testDir, UUID.randomUUID().toString());
+ FunctionCommon.downloadFromHttpUrl(jarHttpUrl, pkgFile);
+ Assert.assertTrue(pkgFile.exists());
+ pkgFile.delete();
+ }
+
+ @Test
+ public void testGetSequenceId() {
+ long lid = 12345L;
+ long eid = 34566L;
+ MessageIdImpl id = mock(MessageIdImpl.class);
+ when(id.getLedgerId()).thenReturn(lid);
+ when(id.getEntryId()).thenReturn(eid);
+
+ assertEquals((lid << 28) | eid, FunctionCommon.getSequenceId(id));
+ }
+
+ @Test
+ public void testGetMessageId() {
+ long lid = 12345L;
+ long eid = 34566L;
+ long sequenceId = (lid << 28) | eid;
+
+ MessageIdImpl id = (MessageIdImpl) FunctionCommon.getMessageId(sequenceId);
+ assertEquals(lid, id.getLedgerId());
+ assertEquals(eid, id.getEntryId());
+ }
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 246834e..b137555 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -46,7 +46,7 @@ import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.utils.Actions;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import java.io.File;
@@ -69,8 +69,8 @@ import static org.apache.pulsar.common.functions.Utils.FILE;
import static org.apache.pulsar.common.functions.Utils.HTTP;
import static org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported;
import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
-import static org.apache.pulsar.functions.utils.Utils.getSinkType;
-import static org.apache.pulsar.functions.utils.Utils.getSourceType;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType;
@Data
@Setter
@@ -97,6 +97,7 @@ public class FunctionActioner {
this.pulsarAdmin = pulsarAdmin;
}
+
public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) {
try {
FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData();
@@ -127,7 +128,7 @@ public class FunctionActioner {
pkgDir.mkdirs();
File pkgFile = new File(
pkgDir,
- new File(FunctionDetailsUtils.getDownloadFileName(functionMetaData.getFunctionDetails(), functionMetaData.getPackageLocation())).getName());
+ new File(getDownloadFileName(functionMetaData.getFunctionDetails(), functionMetaData.getPackageLocation())).getName());
downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId);
packageFile = pkgFile.getAbsolutePath();
}
@@ -175,7 +176,7 @@ public class FunctionActioner {
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
instanceConfig.setInstanceId(instanceId);
instanceConfig.setMaxBufferedTuples(1024);
- instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
+ instanceConfig.setPort(FunctionCommon.findAvailablePort());
instanceConfig.setClusterName(clusterName);
instanceConfig.setFunctionAuthenticationSpec(functionAuthSpec);
return instanceConfig;
@@ -208,9 +209,9 @@ public class FunctionActioner {
downloadFromHttp ? pkgLocationPath : functionMetaData.getPackageLocation());
if(downloadFromHttp) {
- Utils.downloadFromHttpUrl(pkgLocationPath, new FileOutputStream(tempPkgFile));
+ FunctionCommon.downloadFromHttpUrl(pkgLocationPath, tempPkgFile);
} else {
- Utils.downloadFromBookkeeper(
+ WorkerUtils.downloadFromBookkeeper(
dlogNamespace,
new FileOutputStream(tempPkgFile),
pkgLocationPath);
@@ -249,7 +250,7 @@ public class FunctionActioner {
Paths.get(pkgDir.toURI()), RecursiveDeleteOption.ALLOW_INSECURE);
} catch (IOException e) {
log.warn("Failed to delete package for function: {}",
- FunctionDetailsUtils.getFullyQualifiedName(functionMetaData.getFunctionDetails()), e);
+ FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails()), e);
}
}
}
@@ -270,7 +271,7 @@ public class FunctionActioner {
public void terminateFunction(FunctionRuntimeInfo functionRuntimeInfo) {
FunctionDetails details = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
- String fqfn = FunctionDetailsUtils.getFullyQualifiedName(details);
+ String fqfn = FunctionCommon.getFullyQualifiedName(details);
log.info("{}-{} Terminating function...", fqfn,functionRuntimeInfo.getFunctionInstance().getInstanceId());
if (functionRuntimeInfo.getRuntimeSpawner() != null) {
@@ -460,4 +461,27 @@ public class FunctionActioner {
}
}
+ private static String getDownloadFileName(FunctionDetails FunctionDetails,
+ Function.PackageLocationMetaData packageLocation) {
+ if (!org.apache.commons.lang.StringUtils.isEmpty(packageLocation.getOriginalFileName())) {
+ return packageLocation.getOriginalFileName();
+ }
+ String[] hierarchy = FunctionDetails.getClassName().split("\\.");
+ String fileName;
+ if (hierarchy.length <= 0) {
+ fileName = FunctionDetails.getClassName();
+ } else if (hierarchy.length == 1) {
+ fileName = hierarchy[0];
+ } else {
+ fileName = hierarchy[hierarchy.length - 2];
+ }
+ switch (FunctionDetails.getRuntime()) {
+ case JAVA:
+ return fileName + ".jar";
+ case PYTHON:
+ return fileName + ".py";
+ default:
+ throw new RuntimeException("Unknown runtime " + FunctionDetails.getRuntime());
+ }
+ }
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 8c79f83..4618e88 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionInstanceId;
import org.apache.pulsar.functions.utils.Reflections;
@@ -321,7 +322,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
final String workerId = this.workerConfig.getWorkerId();
if (assignedWorkerId.equals(workerId)) {
- stopFunction(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()), true);
+ stopFunction(FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()), true);
return;
} else {
// query other worker
@@ -361,7 +362,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
Assignment assignment = assignments.iterator().next();
final String assignedWorkerId = assignment.getWorkerId();
final String workerId = this.workerConfig.getWorkerId();
- String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+ String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
if (assignedWorkerId.equals(workerId)) {
stopFunction(fullyQualifiedInstanceId, true);
} else {
@@ -386,7 +387,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
for (Assignment assignment : assignments) {
final String assignedWorkerId = assignment.getWorkerId();
final String workerId = this.workerConfig.getWorkerId();
- String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+ String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
if (assignedWorkerId.equals(workerId)) {
stopFunction(fullyQualifiedInstanceId, true);
} else {
@@ -424,7 +425,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
Map<String, Assignment> assignments = workerIdToAssignments.get(workerId);
if (assignments != null) {
assignments.values().forEach(assignment -> {
- String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+ String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
try {
stopFunction(fullyQualifiedInstanceId, false);
} catch (Exception e) {
@@ -478,10 +479,10 @@ public class FunctionRuntimeManager implements AutoCloseable{
// If I am running worker
if (assignedWorkerId.equals(workerId)) {
FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(
- org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
+ FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()));
RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
if (runtimeSpawner != null) {
- return Utils.getFunctionInstanceStats(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()), functionRuntimeInfo, instanceId).getMetrics();
+ return WorkerUtils.getFunctionInstanceStats(FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()), functionRuntimeInfo, instanceId).getMetrics();
}
return new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData();
} else {
@@ -595,7 +596,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
existingAssignmentMap.putAll(entry);
}
- if (existingAssignmentMap.containsKey(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(newAssignment.getInstance()))) {
+ if (existingAssignmentMap.containsKey(FunctionCommon.getFullyQualifiedInstanceId(newAssignment.getInstance()))) {
updateAssignment(newAssignment);
} else {
addAssignment(newAssignment);
@@ -603,7 +604,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
}
private void updateAssignment(Assignment assignment) {
- String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+ String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
Assignment existingAssignment = this.findAssignment(assignment);
// potential updates need to happen
@@ -728,7 +729,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
@VisibleForTesting
void deleteAssignment(Assignment assignment) {
- String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+ String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
Map<String, Assignment> assignmentMap = this.workerIdToAssignments.get(assignment.getWorkerId());
if (assignmentMap != null) {
if (assignmentMap.containsKey(fullyQualifiedInstanceId)) {
@@ -751,7 +752,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
}
private void startFunctionInstance(Assignment assignment) {
- String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+ String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
FunctionRuntimeInfo functionRuntimeInfo = _getFunctionRuntimeInfo(fullyQualifiedInstanceId);
if (functionRuntimeInfo == null) {
@@ -776,7 +777,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
*/
private Assignment findAssignment(String tenant, String namespace, String functionName, int instanceId) {
String fullyQualifiedInstanceId
- = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(tenant, namespace, functionName, instanceId);
+ = FunctionCommon.getFullyQualifiedInstanceId(tenant, namespace, functionName, instanceId);
for (Map.Entry<String, Map<String, Assignment>> entry : this.workerIdToAssignments.entrySet()) {
Map<String, Assignment> assignmentMap = entry.getValue();
Assignment existingAssignment = assignmentMap.get(fullyQualifiedInstanceId);
@@ -802,7 +803,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
this.workerIdToAssignments.put(assignment.getWorkerId(), new HashMap<>());
}
this.workerIdToAssignments.get(assignment.getWorkerId()).put(
- org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()),
+ FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()),
assignment);
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index efe702f..f78513a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -46,7 +46,8 @@ import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
+
import static org.apache.pulsar.functions.worker.SchedulerManager.checkHeartBeatFunction;
/**
@@ -179,7 +180,7 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener {
List<Function.FunctionMetaData> functionMetaDataList = functionMetaDataManager.getAllFunctionMetaData();
Map<String, Function.FunctionMetaData> functionMetaDataMap = new HashMap<>();
for (Function.FunctionMetaData entry : functionMetaDataList) {
- functionMetaDataMap.put(FunctionDetailsUtils.getFullyQualifiedName(entry.getFunctionDetails()), entry);
+ functionMetaDataMap.put(FunctionCommon.getFullyQualifiedName(entry.getFunctionDetails()), entry);
}
Map<String, Map<String, Function.Assignment>> currentAssignments = functionRuntimeManager.getCurrentAssignments();
Map<String, Function.Assignment> assignmentMap = new HashMap<>();
@@ -192,9 +193,9 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener {
Iterator<Map.Entry<Function.Instance, Long>> it = unsignedFunctionDurations.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Function.Instance, Long> entry = it.next();
- String fullyQualifiedFunctionName = FunctionDetailsUtils.getFullyQualifiedName(
+ String fullyQualifiedFunctionName = FunctionCommon.getFullyQualifiedName(
entry.getKey().getFunctionMetaData().getFunctionDetails());
- String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(entry.getKey());
+ String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(entry.getKey());
//remove functions that don't exist anymore
if (!functionMetaDataMap.containsKey(fullyQualifiedFunctionName)) {
it.remove();
@@ -262,7 +263,7 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener {
if (currentTimeMs - unassignedDurationMs > this.workerConfig.getRescheduleTimeoutMs()) {
needSchedule.add(instance);
// remove assignment from failed node
- Function.Assignment assignment = assignmentMap.get(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(instance));
+ Function.Assignment assignment = assignmentMap.get(FunctionCommon.getFullyQualifiedInstanceId(instance));
if (assignment != null) {
needRemove.add(assignment);
}
@@ -284,7 +285,7 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener {
private PulsarAdmin getPulsarAdminClient() {
if (this.pulsarAdminClient == null) {
- this.pulsarAdminClient = Utils.getPulsarAdminClient(this.workerConfig.getPulsarWebServiceUrl(),
+ this.pulsarAdminClient = WorkerUtils.getPulsarAdminClient(this.workerConfig.getPulsarWebServiceUrl(),
workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(),
workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection());
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index c281b15..2e25f0f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -18,18 +18,15 @@
*/
package org.apache.pulsar.functions.worker;
-import com.google.common.base.Stopwatch;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -49,7 +46,7 @@ import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.Function.Instance;
import org.apache.pulsar.functions.utils.Actions;
import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.scheduler.IScheduler;
import com.google.common.annotations.VisibleForTesting;
@@ -256,7 +253,7 @@ public class SchedulerManager implements AutoCloseable {
private void publishNewAssignment(Assignment assignment, boolean deleted) {
try {
- String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+ String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
// publish empty message with instance-id key so, compactor can delete and skip delivery of this instance-id
// message
producer.newMessage().key(fullyQualifiedInstanceId)
@@ -272,7 +269,7 @@ public class SchedulerManager implements AutoCloseable {
Map<String, Function.Instance> functionInstances = new HashMap<>();
for (FunctionMetaData functionMetaData : allFunctions) {
for (Function.Instance instance : computeInstances(functionMetaData, externallyManagedRuntime)) {
- functionInstances.put(Utils.getFullyQualifiedInstanceId(instance), instance);
+ functionInstances.put(FunctionCommon.getFullyQualifiedInstanceId(instance), instance);
}
}
return functionInstances;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index db84462..b9814b3 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -74,7 +74,7 @@ public class Worker {
private static URI initialize(WorkerConfig workerConfig)
throws InterruptedException, PulsarAdminException, IOException {
// initializing pulsar functions namespace
- PulsarAdmin admin = Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
+ PulsarAdmin admin = WorkerUtils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(),
workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection());
InternalConfigurationData internalConf;
@@ -141,7 +141,7 @@ public class Worker {
// initialize the dlog namespace
// TODO: move this as part of pulsar cluster initialization later
try {
- return Utils.initializeDlogNamespace(
+ return WorkerUtils.initializeDlogNamespace(
internalConf.getZookeeperServers(),
internalConf.getLedgersRootPath());
} catch (IOException ioe) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index b035fb8..c3211b2 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -24,7 +24,6 @@ import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.DefaultThreadFactory;
-import java.io.IOException;
import java.net.URI;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -37,24 +36,16 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.admin.StorageAdminClient;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
-import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
-import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
-import org.apache.pulsar.zookeeper.GlobalZooKeeperCache;
-import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
-import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
-import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
/**
* A service component contains everything to run a worker except rest server.
@@ -101,14 +92,14 @@ public class WorkerService {
AuthorizationService authorizationService) throws InterruptedException {
log.info("Starting worker {}...", workerConfig.getWorkerId());
- this.brokerAdmin = Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
+ this.brokerAdmin = WorkerUtils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(),
workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection());
final String functionWebServiceUrl = StringUtils.isNotBlank(workerConfig.getFunctionWebServiceUrl())
? workerConfig.getFunctionWebServiceUrl()
: workerConfig.getWorkerWebAddress();
- this.functionAdmin = Utils.getPulsarAdminClient(functionWebServiceUrl,
+ this.functionAdmin = WorkerUtils.getPulsarAdminClient(functionWebServiceUrl,
workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(),
workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection());
@@ -121,7 +112,7 @@ public class WorkerService {
// create the dlog namespace for storing function packages
this.dlogUri = dlogUri;
- DistributedLogConfiguration dlogConf = Utils.getDlogConf(workerConfig);
+ DistributedLogConfiguration dlogConf = WorkerUtils.getDlogConf(workerConfig);
try {
this.dlogNamespace = NamespaceBuilder.newBuilder()
.conf(dlogConf)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
similarity index 91%
rename from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
rename to pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 3c1fa4c..3332f5c 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -18,19 +18,7 @@
*/
package org.apache.pulsar.functions.worker;
-import java.io.*;
-import java.net.URI;
-import java.net.URL;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-
import lombok.extern.slf4j.Slf4j;
-
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.distributedlog.AppendOnlyStreamWriter;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.DistributedLogManager;
@@ -49,25 +37,23 @@ import org.apache.pulsar.functions.worker.dlog.DLInputStream;
import org.apache.pulsar.functions.worker.dlog.DLOutputStream;
import org.apache.zookeeper.KeeperException.Code;
-@Slf4j
-public final class Utils {
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
- private Utils(){}
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
- public static byte[] toByteArray(Object obj) throws IOException {
- byte[] bytes = null;
- try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(bos)) {
- oos.writeObject(obj);
- oos.flush();
- bytes = bos.toByteArray();
- }
- return bytes;
- }
+@Slf4j
+public final class WorkerUtils {
- public static String getUniquePackageName(String packageName) {
- return String.format("%s-%s", UUID.randomUUID().toString(), packageName);
- }
+ private WorkerUtils(){}
public static void uploadFileToBookkeeper(String packagePath, File sourceFile, Namespace dlogNamespace) throws IOException {
FileInputStream uploadedInputStream = new FileInputStream(sourceFile);
@@ -104,12 +90,6 @@ public final class Utils {
}
}
}
-
- public static void downloadFromHttpUrl(String destPkgUrl, FileOutputStream outputStream) throws IOException {
- URL website = new URL(destPkgUrl);
- ReadableByteChannel rbc = Channels.newChannel(website.openStream());
- outputStream.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
- }
public static void downloadFromBookkeeper(Namespace namespace,
File outputFile,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index fd75da3..31070af 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -73,10 +73,10 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.join;
-import static org.apache.pulsar.functions.utils.Utils.*;
-import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION;
-import static org.apache.pulsar.functions.utils.Utils.ComponentType.SINK;
-import static org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE;
+import static org.apache.pulsar.functions.utils.FunctionCommon.*;
+import static org.apache.pulsar.functions.utils.ComponentType.FUNCTION;
+import static org.apache.pulsar.functions.utils.ComponentType.SINK;
+import static org.apache.pulsar.functions.utils.ComponentType.SOURCE;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
@@ -108,14 +108,15 @@ import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.sink.PulsarSink;
+import org.apache.pulsar.functions.utils.ComponentType;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
-import org.apache.pulsar.functions.utils.StateUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.rest.RestException;
@@ -169,7 +170,7 @@ public abstract class ComponentImpl {
// If I am running worker
if (assignedWorkerId.equals(workerId)) {
FunctionRuntimeInfo functionRuntimeInfo = worker().getFunctionRuntimeManager().getFunctionRuntimeInfo(
- org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
+ FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()));
if (functionRuntimeInfo == null) {
return notRunning(assignedWorkerId, "");
}
@@ -446,20 +447,20 @@ public abstract class ComponentImpl {
sinkOrSource.getName()));
packageLocationMetaDataBuilder.setOriginalFileName(sinkOrSource.getName());
log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath());
- Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), sinkOrSource, worker().getDlogNamespace());
+ WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), sinkOrSource, worker().getDlogNamespace());
} else if (isPkgUrlProvided) {
File file = extractFileFromPkg(functionPkgUrl);
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
file.getName()));
packageLocationMetaDataBuilder.setOriginalFileName(file.getName());
log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath());
- Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), file, worker().getDlogNamespace());
+ WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), file, worker().getDlogNamespace());
} else {
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
fileDetail.getFileName()));
packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath());
- Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, worker().getDlogNamespace());
+ WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, worker().getDlogNamespace());
}
} else {
// For pulsar managed schedulers, the pkgUrl/builtin stuff should be copied to bk
@@ -471,7 +472,7 @@ public abstract class ComponentImpl {
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, fileDetail.getFileName()));
packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath());
- Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, worker().getDlogNamespace());
+ WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, worker().getDlogNamespace());
}
}
return packageLocationMetaDataBuilder;
@@ -651,7 +652,7 @@ public abstract class ComponentImpl {
// delete state table
if (null != worker().getStateStoreAdminClient()) {
- final String tableNs = StateUtils.getStateNamespace(tenant, namespace);
+ final String tableNs = getStateNamespace(tenant, namespace);
final String tableName = componentName;
try {
FutureUtils.result(worker().getStateStoreAdminClient().deleteStream(tableNs, tableName));
@@ -1339,7 +1340,7 @@ public abstract class ComponentImpl {
throw new RestException(Status.BAD_REQUEST, e.getMessage());
}
- String tableNs = StateUtils.getStateNamespace(tenant, namespace);
+ String tableNs = getStateNamespace(tenant, namespace);
String tableName = functionName;
String stateStorageServiceUrl = worker().getWorkerConfig().getStateStorageServiceUrl();
@@ -1389,7 +1390,7 @@ public abstract class ComponentImpl {
// Upload to bookkeeper
try {
log.info("Uploading function package to {}", path);
- Utils.uploadToBookeeper(worker().getDlogNamespace(), uploadedInputStream, path);
+ WorkerUtils.uploadToBookeeper(worker().getDlogNamespace(), uploadedInputStream, path);
} catch (IOException e) {
log.error("Error uploading file {}", path, e);
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
@@ -1414,7 +1415,7 @@ public abstract class ComponentImpl {
throw new IllegalArgumentException("invalid file url path: " + path);
}
} else {
- Utils.downloadFromBookkeeper(worker().getDlogNamespace(), output, path);
+ WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), output, path);
}
}
};
@@ -1514,7 +1515,7 @@ public abstract class ComponentImpl {
final ComponentType componentType) throws Exception {
File tmpFile = File.createTempFile("functions", null);
tmpFile.deleteOnExit();
- Utils.downloadFromBookkeeper(worker().getDlogNamespace(), tmpFile, packageLocationMetaData.getPackagePath());
+ WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), tmpFile, packageLocationMetaData.getPackagePath());
return validateUpdateRequestParams(tenant, namespace, componentName,
null, componentConfigJson, componentType, null, tmpFile);
}
@@ -1659,7 +1660,7 @@ public abstract class ComponentImpl {
return SinkConfigUtils.convert(sinkConfig, sinkDetails);
}
FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
- org.apache.pulsar.functions.utils.Utils.mergeJson(functionDetailsJson, functionDetailsBuilder);
+ FunctionCommon.mergeJson(functionDetailsJson, functionDetailsBuilder);
if (isNotBlank(functionPkgUrl)) {
// set package-url if present
functionDetailsBuilder.setPackageUrl(functionPkgUrl);
@@ -1728,7 +1729,7 @@ public abstract class ComponentImpl {
// validate function class-type
Object functionObject = createInstance(functionDetailsBuilder.getClassName(), classLoader);
- Class<?>[] typeArgs = org.apache.pulsar.functions.utils.Utils.getFunctionTypes(functionObject, false);
+ Class<?>[] typeArgs = FunctionCommon.getFunctionTypes(functionObject, false);
if (!(functionObject instanceof org.apache.pulsar.functions.api.Function)
&& !(functionObject instanceof java.util.function.Function)) {
@@ -1832,7 +1833,7 @@ public abstract class ComponentImpl {
public static String createPackagePath(String tenant, String namespace, String functionName, String fileName) {
return String.format("%s/%s/%s/%s", tenant, namespace, Codec.encode(functionName),
- Utils.getUniquePackageName(Codec.encode(fileName)));
+ getUniquePackageName(Codec.encode(fileName)));
}
public boolean isAuthorizedRole(String tenant, String namespace, String clientRole,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 04628d6..93642f6 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -25,7 +25,7 @@ import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.ComponentType;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.RestException;
@@ -197,7 +197,7 @@ public class FunctionsImpl extends ComponentImpl {
}
public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
- super(workerServiceSupplier, Utils.ComponentType.FUNCTION);
+ super(workerServiceSupplier, ComponentType.FUNCTION);
}
/**
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
index 49d1156..aed383e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.WorkerService;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -61,7 +62,7 @@ public class FunctionsImplV2 {
Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace,
functionName);
- String functionDetailsJson = org.apache.pulsar.functions.utils.Utils.printJson(functionMetaData.getFunctionDetails());
+ String functionDetailsJson = FunctionCommon.printJson(functionMetaData.getFunctionDetails());
return Response.status(Response.Status.OK).entity(functionDetailsJson).build();
}
@@ -71,7 +72,7 @@ public class FunctionsImplV2 {
org.apache.pulsar.common.policies.data.FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatus = delegate.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri, null, null);
- String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(toProto(functionInstanceStatus, instanceId));
+ String jsonResponse = FunctionCommon.printJson(toProto(functionInstanceStatus, instanceId));
return Response.status(Response.Status.OK).entity(jsonResponse).build();
}
@@ -82,7 +83,7 @@ public class FunctionsImplV2 {
functionStatus.instances.forEach(functionInstanceStatus -> functionStatusList.addFunctionStatusList(
toProto(functionInstanceStatus.getStatus(),
String.valueOf(functionInstanceStatus.getInstanceId()))));
- String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(functionStatusList);
+ String jsonResponse = FunctionCommon.printJson(functionStatusList);
return Response.status(Response.Status.OK).entity(jsonResponse).build();
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index acd924b..e4ab6d3 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -26,8 +26,8 @@ import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.utils.ComponentType;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
-import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.RestException;
@@ -204,7 +204,7 @@ public class SinkImpl extends ComponentImpl {
}
public SinkImpl(Supplier<WorkerService> workerServiceSupplier) {
- super(workerServiceSupplier, Utils.ComponentType.SINK);
+ super(workerServiceSupplier, ComponentType.SINK);
}
public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus(final String tenant,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index e5ed28e..44b36dc 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -26,8 +26,8 @@ import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.SourceStatus;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.utils.ComponentType;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
-import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.RestException;
@@ -206,7 +206,7 @@ public class SourceImpl extends ComponentImpl {
}
public SourceImpl(Supplier<WorkerService> workerServiceSupplier) {
- super(workerServiceSupplier, Utils.ComponentType.SOURCE);
+ super(workerServiceSupplier, ComponentType.SOURCE);
}
public SourceStatus getSourceStatus(final String tenant,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index 79f4df2..041ac26 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -24,10 +24,11 @@ import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;
import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.MembershipManager;
-import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.RestException;
@@ -163,9 +164,9 @@ public class WorkerImpl {
int parallelism = functionDetails.getParallelism();
for (int i = 0; i < parallelism; ++i) {
FunctionStats.FunctionInstanceStats functionInstanceStats =
- Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo, i);
+ WorkerUtils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo, i);
WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats();
- workerFunctionInstanceStats.setName(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(
+ workerFunctionInstanceStats.setName(FunctionCommon.getFullyQualifiedInstanceId(
functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName(), i
));
workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics());
@@ -173,7 +174,7 @@ public class WorkerImpl {
}
} else {
FunctionStats.FunctionInstanceStats functionInstanceStats =
- Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo,
+ WorkerUtils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo,
functionRuntimeInfo.getFunctionInstance().getInstanceId());
WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats();
workerFunctionInstanceStats.setName(fullyQualifiedInstanceName);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 91fec23..b009567 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -31,7 +31,7 @@ import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.KubernetesRuntime;
import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -219,7 +219,7 @@ public class FunctionRuntimeManagerTest {
functionRuntimeManager.processAssignment(assignment1);
functionRuntimeManager.processAssignment(assignment2);
- functionRuntimeManager.deleteAssignment(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()));
+ functionRuntimeManager.deleteAssignment(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance()));
verify(functionRuntimeManager, times(0)).setAssignment(any(Function.Assignment.class));
verify(functionRuntimeManager, times(1)).deleteAssignment(any(String.class));
@@ -551,16 +551,16 @@ public class FunctionRuntimeManagerTest {
List<Message<byte[]>> messageList = new LinkedList<>();
Message message1 = spy(new MessageImpl("foo", MessageId.latest.toString(),
new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null));
- doReturn(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey();
+ doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey();
Message message2 = spy(new MessageImpl("foo", MessageId.latest.toString(),
new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null));
- doReturn(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey();
+ doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey();
// delete function2
Message message3 = spy(new MessageImpl("foo", MessageId.latest.toString(),
new HashMap<>(), Unpooled.copiedBuffer("".getBytes()), null));
- doReturn(Utils.getFullyQualifiedInstanceId(assignment3.getInstance())).when(message3).getKey();
+ doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment3.getInstance())).when(message3).getKey();
messageList.add(message1);
messageList.add(message2);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 465729e..9f4fa2e 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -34,7 +34,7 @@ import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.Assignment;
import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
import org.mockito.Mockito;
import org.mockito.invocation.Invocation;
@@ -155,7 +155,7 @@ public class SchedulerManagerTest {
Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
- assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
+ assignmentEntry1.put(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
currentAssignments.put("worker-1", assignmentEntry1);
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
@@ -201,7 +201,7 @@ public class SchedulerManagerTest {
Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
- assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
+ assignmentEntry1.put(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
currentAssignments.put("worker-1", assignmentEntry1);
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
@@ -248,7 +248,7 @@ public class SchedulerManagerTest {
Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
- assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
+ assignmentEntry1.put(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
currentAssignments.put("worker-1", assignmentEntry1);
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
@@ -315,9 +315,9 @@ public class SchedulerManagerTest {
Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
- assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
+ assignmentEntry1.put(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
//TODO: delete this assignment
- assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2.getInstance()), assignment2);
+ assignmentEntry1.put(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance()), assignment2);
currentAssignments.put("worker-1", assignmentEntry1);
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
@@ -374,7 +374,7 @@ public class SchedulerManagerTest {
Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
- assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
+ assignmentEntry1.put(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
currentAssignments.put("worker-1", assignmentEntry1);
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
@@ -406,7 +406,7 @@ public class SchedulerManagerTest {
Assert.assertEquals(assignments, assignment2);
// updating assignments
- currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2.getInstance()), assignment2);
+ currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance()), assignment2);
// scale up
@@ -485,7 +485,7 @@ public class SchedulerManagerTest {
Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
- assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
+ assignmentEntry1.put(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
currentAssignments.put("worker-1", assignmentEntry1);
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
@@ -539,9 +539,9 @@ public class SchedulerManagerTest {
assertTrue(allAssignments.contains(assignment2_3));
// updating assignments
- currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1);
- currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2);
- currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_3.getInstance()), assignment2_3);
+ currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1);
+ currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2);
+ currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_3.getInstance()), assignment2_3);
// scale down
@@ -666,7 +666,7 @@ public class SchedulerManagerTest {
Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
- assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
+ assignmentEntry1.put(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
currentAssignments.put("worker-1", assignmentEntry1);
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
@@ -717,9 +717,9 @@ public class SchedulerManagerTest {
assertTrue(allAssignments.contains(assignment2_3));
// updating assignments
- currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1);
- currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2);
- currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_3.getInstance()), assignment2_3);
+ currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1);
+ currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2);
+ currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_3.getInstance()), assignment2_3);
// update field
@@ -806,11 +806,11 @@ public class SchedulerManagerTest {
Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
- assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
+ assignmentEntry1.put(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
currentAssignments.put("worker-1", assignmentEntry1);
Map<String, Function.Assignment> assignmentEntry2 = new HashMap<>();
- assignmentEntry2.put(Utils.getFullyQualifiedInstanceId(assignment2.getInstance()), assignment2);
+ assignmentEntry2.put(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance()), assignment2);
currentAssignments.put("worker-2", assignmentEntry2);
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/UtilsTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/UtilsTest.java
deleted file mode 100644
index 77f95d0..0000000
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/UtilsTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.worker;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.util.UUID;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-/**
- * Unit test of {@link Utils}.
- */
-public class UtilsTest {
-
- @Test
- public void testDownloadFile() throws Exception {
- String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";
- String testDir = UtilsTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
- File pkgFile = new File(testDir, UUID.randomUUID().toString());
- Utils.downloadFromHttpUrl(jarHttpUrl, new FileOutputStream(pkgFile));
- Assert.assertTrue(pkgFile.exists());
- pkgFile.delete();
- }
-
-}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/request/ServiceRequestManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/request/ServiceRequestManagerTest.java
index 6e9993e..91fa0e6 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/request/ServiceRequestManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/request/ServiceRequestManagerTest.java
@@ -29,7 +29,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.functions.proto.Request.ServiceRequest;
-import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerUtils;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
@@ -37,7 +37,7 @@ import org.testng.annotations.Test;
/**
* Unit test of {@link ServiceRequestManager}.
*/
-@PrepareForTest(Utils.class)
+@PrepareForTest(WorkerUtils.class)
public class ServiceRequestManagerTest {
private final Producer producer;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index f774180..17a3086 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -38,7 +38,7 @@ import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -53,7 +53,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION;
+import static org.apache.pulsar.functions.utils.ComponentType.FUNCTION;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
@@ -197,9 +197,9 @@ public class FunctionsImplTest {
FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class);
doReturn(runtimeSpawner).when(functionRuntimeInfo).getRuntimeSpawner();
- FunctionStats.FunctionInstanceStats instanceStats1 = Utils
+ FunctionStats.FunctionInstanceStats instanceStats1 = WorkerUtils
.getFunctionInstanceStats("public/default/test", functionRuntimeInfo, 0);
- FunctionStats.FunctionInstanceStats instanceStats2 = Utils
+ FunctionStats.FunctionInstanceStats instanceStats2 = WorkerUtils
.getFunctionInstanceStats("public/default/test", functionRuntimeInfo, 1);
FunctionStats functionStats = new FunctionStats();
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 2926c93..5072448 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -45,7 +45,7 @@ import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.request.RequestResult;
@@ -75,8 +75,8 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION;
-import static org.apache.pulsar.functions.utils.Utils.mergeJson;
+import static org.apache.pulsar.functions.utils.ComponentType.FUNCTION;
+import static org.apache.pulsar.functions.utils.FunctionCommon.mergeJson;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
@@ -92,7 +92,7 @@ import static org.testng.Assert.assertEquals;
/**
* Unit test of {@link FunctionApiV2Resource}.
*/
-@PrepareForTest(Utils.class)
+@PrepareForTest(WorkerUtils.class)
@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" })
@Slf4j
public class FunctionApiV2ResourceTest {
@@ -524,10 +524,10 @@ public class FunctionApiV2ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
public void testRegisterFunctionUploadFailure() throws Exception {
try {
- mockStatic(Utils.class);
- doThrow(new IOException("upload failure")).when(Utils.class);
+ mockStatic(WorkerUtils.class);
+ doThrow(new IOException("upload failure")).when(WorkerUtils.class);
- Utils.uploadFileToBookkeeper(
+ WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
@@ -543,9 +543,9 @@ public class FunctionApiV2ResourceTest {
@Test
public void testRegisterFunctionSuccess() throws Exception {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadToBookeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadToBookeeper(
any(Namespace.class),
any(InputStream.class),
anyString());
@@ -590,9 +590,9 @@ public class FunctionApiV2ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "function failed to register")
public void testRegisterFunctionFailure() throws Exception {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadToBookeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadToBookeeper(
any(Namespace.class),
any(InputStream.class),
anyString());
@@ -615,9 +615,9 @@ public class FunctionApiV2ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
public void testRegisterFunctionInterrupted() throws Exception {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadToBookeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadToBookeeper(
any(Namespace.class),
any(InputStream.class),
anyString());
@@ -705,9 +705,9 @@ public class FunctionApiV2ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
public void testUpdateFunctionMissingPackage() throws IOException {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
testUpdateFunctionMissingArguments(
tenant,
namespace,
@@ -729,9 +729,9 @@ public class FunctionApiV2ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
public void testUpdateFunctionMissingInputTopic() throws IOException {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
testUpdateFunctionMissingArguments(
tenant,
namespace,
@@ -753,9 +753,9 @@ public class FunctionApiV2ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
public void testUpdateFunctionMissingClassName() throws IOException {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
testUpdateFunctionMissingArguments(
tenant,
namespace,
@@ -777,9 +777,9 @@ public class FunctionApiV2ResourceTest {
@Test
public void testUpdateFunctionChangedParallelism() throws IOException {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
testUpdateFunctionMissingArguments(
tenant,
namespace,
@@ -801,9 +801,9 @@ public class FunctionApiV2ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Output topics differ")
public void testUpdateFunctionChangedInputs() throws IOException {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
testUpdateFunctionMissingArguments(
tenant,
namespace,
@@ -825,9 +825,9 @@ public class FunctionApiV2ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Input Topics cannot be altered")
public void testUpdateFunctionChangedOutput() throws IOException {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
Map<String, String> someOtherInput = new HashMap<>();
someOtherInput.put("DifferentTopic", TopicSchema.DEFAULT_SERDE);
testUpdateFunctionMissingArguments(
@@ -948,9 +948,9 @@ public class FunctionApiV2ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
public void testUpdateFunctionUploadFailure() throws Exception {
try {
- mockStatic(Utils.class);
- doThrow(new IOException("upload failure")).when(Utils.class);
- Utils.uploadFileToBookkeeper(
+ mockStatic(WorkerUtils.class);
+ doThrow(new IOException("upload failure")).when(WorkerUtils.class);
+ WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
@@ -966,9 +966,9 @@ public class FunctionApiV2ResourceTest {
@Test
public void testUpdateFunctionSuccess() throws Exception {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadToBookeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadToBookeeper(
any(Namespace.class),
any(InputStream.class),
anyString());
@@ -1025,9 +1025,9 @@ public class FunctionApiV2ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "function failed to register")
public void testUpdateFunctionFailure() throws Exception {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadToBookeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadToBookeeper(
any(Namespace.class),
any(InputStream.class),
anyString());
@@ -1050,9 +1050,9 @@ public class FunctionApiV2ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
public void testUpdateFunctionInterrupted() throws Exception {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadToBookeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadToBookeeper(
any(Namespace.class),
any(InputStream.class),
anyString());
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 871ff5f..3b139e8 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -45,7 +45,7 @@ import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.request.RequestResult;
@@ -75,9 +75,9 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION;
-import static org.apache.pulsar.functions.utils.Utils.ComponentType.SINK;
-import static org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE;
+import static org.apache.pulsar.functions.utils.ComponentType.FUNCTION;
+import static org.apache.pulsar.functions.utils.ComponentType.SINK;
+import static org.apache.pulsar.functions.utils.ComponentType.SOURCE;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
@@ -93,7 +93,7 @@ import static org.testng.Assert.assertEquals;
/**
* Unit test of {@link FunctionApiV2Resource}.
*/
-@PrepareForTest(Utils.class)
+@PrepareForTest(WorkerUtils.class)
@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" })
@Slf4j
public class FunctionApiV3ResourceTest {
@@ -521,10 +521,10 @@ public class FunctionApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
public void testRegisterFunctionUploadFailure() throws Exception {
try {
- mockStatic(Utils.class);
- doThrow(new IOException("upload failure")).when(Utils.class);
+ mockStatic(WorkerUtils.class);
+ doThrow(new IOException("upload failure")).when(WorkerUtils.class);
- Utils.uploadFileToBookkeeper(
+ WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
@@ -540,9 +540,9 @@ public class FunctionApiV3ResourceTest {
@Test
public void testRegisterFunctionSuccess() throws Exception {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadToBookeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadToBookeeper(
any(Namespace.class),
any(InputStream.class),
anyString());
@@ -587,9 +587,9 @@ public class FunctionApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "function failed to register")
public void testRegisterFunctionFailure() throws Exception {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadToBookeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadToBookeeper(
any(Namespace.class),
any(InputStream.class),
anyString());
@@ -612,9 +612,9 @@ public class FunctionApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
public void testRegisterFunctionInterrupted() throws Exception {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadToBookeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadToBookeeper(
any(Namespace.class),
any(InputStream.class),
anyString());
@@ -702,9 +702,9 @@ public class FunctionApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
public void testUpdateFunctionMissingPackage() throws IOException {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
testUpdateFunctionMissingArguments(
tenant,
namespace,
@@ -726,9 +726,9 @@ public class FunctionApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
public void testUpdateFunctionMissingInputTopic() throws IOException {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
testUpdateFunctionMissingArguments(
tenant,
namespace,
@@ -750,9 +750,9 @@ public class FunctionApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
public void testUpdateFunctionMissingClassName() throws IOException {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
testUpdateFunctionMissingArguments(
tenant,
namespace,
@@ -774,9 +774,9 @@ public class FunctionApiV3ResourceTest {
@Test
public void testUpdateFunctionChangedParallelism() throws IOException {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
testUpdateFunctionMissingArguments(
tenant,
namespace,
@@ -798,9 +798,9 @@ public class FunctionApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Output topics differ")
public void testUpdateFunctionChangedInputs() throws IOException {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
testUpdateFunctionMissingArguments(
tenant,
namespace,
@@ -822,9 +822,9 @@ public class FunctionApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Input Topics cannot be altered")
public void testUpdateFunctionChangedOutput() throws IOException {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
Map<String, String> someOtherInput = new HashMap<>();
someOtherInput.put("DifferentTopic", TopicSchema.DEFAULT_SERDE);
testUpdateFunctionMissingArguments(
@@ -945,9 +945,9 @@ public class FunctionApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
public void testUpdateFunctionUploadFailure() throws Exception {
try {
- mockStatic(Utils.class);
- doThrow(new IOException("upload failure")).when(Utils.class);
- Utils.uploadFileToBookkeeper(
+ mockStatic(WorkerUtils.class);
+ doThrow(new IOException("upload failure")).when(WorkerUtils.class);
+ WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
@@ -963,9 +963,9 @@ public class FunctionApiV3ResourceTest {
@Test
public void testUpdateFunctionSuccess() throws Exception {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadToBookeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadToBookeeper(
any(Namespace.class),
any(InputStream.class),
anyString());
@@ -1022,9 +1022,9 @@ public class FunctionApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "function failed to register")
public void testUpdateFunctionFailure() throws Exception {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadToBookeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadToBookeeper(
any(Namespace.class),
any(InputStream.class),
anyString());
@@ -1047,9 +1047,9 @@ public class FunctionApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
public void testUpdateFunctionInterrupted() throws Exception {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadToBookeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadToBookeeper(
any(Namespace.class),
any(InputStream.class),
anyString());
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index b723411..5112d88 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -38,11 +38,13 @@ import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
+import org.apache.pulsar.functions.utils.ComponentType;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.request.RequestResult;
@@ -88,7 +90,7 @@ import static org.testng.Assert.assertEquals;
/**
* Unit test of {@link SinkApiV3Resource}.
*/
-@PrepareForTest({Utils.class, SinkConfigUtils.class, ConnectorUtils.class, org.apache.pulsar.functions.utils.Utils.class})
+@PrepareForTest({WorkerUtils.class, SinkConfigUtils.class, ConnectorUtils.class, FunctionCommon.class})
@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" })
@Slf4j
public class SinkApiV3ResourceTest {
@@ -174,7 +176,7 @@ public class SinkApiV3ResourceTest {
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
this.resource = spy(new SinkImpl(() -> mockedWorkerService));
- Mockito.doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SINK).when(this.resource).calculateSubjectType(any());
+ Mockito.doReturn(ComponentType.SINK).when(this.resource).calculateSubjectType(any());
}
//
@@ -455,9 +457,9 @@ public class SinkApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
public void testRegisterSinkUploadFailure() throws Exception {
try {
- mockStatic(Utils.class);
- doThrow(new IOException("upload failure")).when(Utils.class);
- Utils.uploadFileToBookkeeper(
+ mockStatic(WorkerUtils.class);
+ doThrow(new IOException("upload failure")).when(WorkerUtils.class);
+ WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
@@ -473,9 +475,9 @@ public class SinkApiV3ResourceTest {
@Test
public void testRegisterSinkSuccess() throws Exception {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadFileToBookkeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
@@ -493,9 +495,9 @@ public class SinkApiV3ResourceTest {
@Test
public void testRegisterSinkConflictingFields() throws Exception {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadFileToBookkeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
@@ -535,9 +537,9 @@ public class SinkApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "sink failed to register")
public void testRegisterSinkFailure() throws Exception {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadFileToBookkeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
@@ -560,9 +562,9 @@ public class SinkApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
public void testRegisterSinkInterrupted() throws Exception {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadFileToBookkeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
@@ -644,9 +646,9 @@ public class SinkApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
public void testUpdateSinkMissingPackage() throws IOException {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
testUpdateSinkMissingArguments(
tenant,
@@ -667,9 +669,9 @@ public class SinkApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
public void testUpdateSinkMissingInputs() throws IOException {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
testUpdateSinkMissingArguments(
tenant,
@@ -690,9 +692,9 @@ public class SinkApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Input Topics cannot be altered")
public void testUpdateSinkDifferentInputs() throws IOException {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
Map<String, String> inputTopics = new HashMap<>();
inputTopics.put("DifferntTopic", DEFAULT_SERDE);
@@ -714,9 +716,9 @@ public class SinkApiV3ResourceTest {
@Test
public void testUpdateSinkDifferentParallelism() throws IOException {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
testUpdateSinkMissingArguments(
tenant,
@@ -744,15 +746,15 @@ public class SinkApiV3ResourceTest {
doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class);
ConnectorUtils.getIOSinkClass(any(NarClassLoader.class));
- mockStatic(org.apache.pulsar.functions.utils.Utils.class);
- doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
- org.apache.pulsar.functions.utils.Utils.getSinkType(anyString(), any(NarClassLoader.class));
+ mockStatic(FunctionCommon.class);
+ doReturn(String.class).when(FunctionCommon.class);
+ FunctionCommon.getSinkType(anyString(), any(NarClassLoader.class));
- doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
- org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
+ doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
+ FunctionCommon.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
- doReturn(ATLEAST_ONCE).when(org.apache.pulsar.functions.utils.Utils.class);
- org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+ doReturn(ATLEAST_ONCE).when(FunctionCommon.class);
+ FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
@@ -814,15 +816,15 @@ public class SinkApiV3ResourceTest {
doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class);
ConnectorUtils.getIOSinkClass(any(NarClassLoader.class));
- mockStatic(org.apache.pulsar.functions.utils.Utils.class);
- doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
- org.apache.pulsar.functions.utils.Utils.getSinkType(anyString(), any(NarClassLoader.class));
+ mockStatic(FunctionCommon.class);
+ doReturn(String.class).when(FunctionCommon.class);
+ FunctionCommon.getSinkType(anyString(), any(NarClassLoader.class));
- doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
- org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
+ doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
+ FunctionCommon.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
- doReturn(ATLEAST_ONCE).when(org.apache.pulsar.functions.utils.Utils.class);
- org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+ doReturn(ATLEAST_ONCE).when(FunctionCommon.class);
+ FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
@@ -854,9 +856,9 @@ public class SinkApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
public void testUpdateSinkUploadFailure() throws Exception {
try {
- mockStatic(Utils.class);
- doThrow(new IOException("upload failure")).when(Utils.class);
- Utils.uploadFileToBookkeeper(
+ mockStatic(WorkerUtils.class);
+ doThrow(new IOException("upload failure")).when(WorkerUtils.class);
+ WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
@@ -872,9 +874,9 @@ public class SinkApiV3ResourceTest {
@Test
public void testUpdateSinkSuccess() throws Exception {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadFileToBookkeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
@@ -909,15 +911,15 @@ public class SinkApiV3ResourceTest {
doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class);
ConnectorUtils.getIOSinkClass(any(NarClassLoader.class));
- mockStatic(org.apache.pulsar.functions.utils.Utils.class);
- doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
- org.apache.pulsar.functions.utils.Utils.getSinkType(anyString(), any(NarClassLoader.class));
+ mockStatic(FunctionCommon.class);
+ doReturn(String.class).when(FunctionCommon.class);
+ FunctionCommon.getSinkType(anyString(), any(NarClassLoader.class));
- doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
- org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
+ doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
+ FunctionCommon.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
- doReturn(ATLEAST_ONCE).when(org.apache.pulsar.functions.utils.Utils.class);
- org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+ doReturn(ATLEAST_ONCE).when(FunctionCommon.class);
+ FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
@@ -944,9 +946,9 @@ public class SinkApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "sink failed to register")
public void testUpdateSinkFailure() throws Exception {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadFileToBookkeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
@@ -969,9 +971,9 @@ public class SinkApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
public void testUpdateSinkInterrupted() throws Exception {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadFileToBookkeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
@@ -1307,9 +1309,9 @@ public class SinkApiV3ResourceTest {
FunctionDetails.newBuilder().setName("test-3").build()).build();
functionMetaDataList.add(f3);
when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
- doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
- doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
- doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+ doReturn(ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
+ doReturn(ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
+ doReturn(ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
List<String> sinkList = listDefaultSinks();
assertEquals(functions, sinkList);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index adaa606..332a360 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -41,11 +41,13 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.source.TopicSchema;
+import org.apache.pulsar.functions.utils.ComponentType;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.request.RequestResult;
@@ -87,7 +89,7 @@ import static org.testng.Assert.assertEquals;
/**
* Unit test of {@link SourceApiV3Resource}.
*/
-@PrepareForTest({Utils.class, ConnectorUtils.class, org.apache.pulsar.functions.utils.Utils.class})
+@PrepareForTest({WorkerUtils.class, ConnectorUtils.class, FunctionCommon.class})
@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" })
@Slf4j
public class SourceApiV3ResourceTest {
@@ -169,7 +171,7 @@ public class SourceApiV3ResourceTest {
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
this.resource = spy(new SourceImpl(() -> mockedWorkerService));
- Mockito.doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE).when(this.resource).calculateSubjectType(any());
+ Mockito.doReturn(ComponentType.SOURCE).when(this.resource).calculateSubjectType(any());
}
//
@@ -424,9 +426,9 @@ public class SourceApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
public void testRegisterSourceUploadFailure() throws Exception {
try {
- mockStatic(Utils.class);
- doThrow(new IOException("upload failure")).when(Utils.class);
- Utils.uploadFileToBookkeeper(
+ mockStatic(WorkerUtils.class);
+ doThrow(new IOException("upload failure")).when(WorkerUtils.class);
+ WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
@@ -441,9 +443,9 @@ public class SourceApiV3ResourceTest {
}
public void testRegisterSourceSuccess() throws Exception {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadFileToBookkeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
@@ -461,9 +463,9 @@ public class SourceApiV3ResourceTest {
@Test
public void testRegisterSourceConflictingFields() throws Exception {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadFileToBookkeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
@@ -504,9 +506,9 @@ public class SourceApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "source failed to register")
public void testRegisterSourceFailure() throws Exception {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadFileToBookkeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
@@ -529,9 +531,9 @@ public class SourceApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registration interrupted")
public void testRegisterSourceInterrupted() throws Exception {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadFileToBookkeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
@@ -616,9 +618,9 @@ public class SourceApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
public void testUpdateSourceMissingPackage() throws IOException {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
testUpdateSourceMissingArguments(
tenant,
@@ -640,9 +642,9 @@ public class SourceApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
public void testUpdateSourceMissingTopicName() throws IOException {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
testUpdateSourceMissingArguments(
tenant,
@@ -664,9 +666,9 @@ public class SourceApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source parallelism should positive number")
public void testUpdateSourceNegativeParallelism() throws IOException {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
testUpdateSourceMissingArguments(
tenant,
@@ -688,9 +690,9 @@ public class SourceApiV3ResourceTest {
@Test
public void testUpdateSourceChangedParallelism() throws IOException {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
testUpdateSourceMissingArguments(
tenant,
@@ -712,9 +714,9 @@ public class SourceApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Destination topics differ")
public void testUpdateSourceChangedTopic() throws IOException {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
testUpdateSourceMissingArguments(
tenant,
@@ -736,9 +738,9 @@ public class SourceApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source parallelism should positive number")
public void testUpdateSourceZeroParallelism() throws IOException {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
testUpdateSourceMissingArguments(
tenant,
@@ -773,12 +775,12 @@ public class SourceApiV3ResourceTest {
doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
- mockStatic(org.apache.pulsar.functions.utils.Utils.class);
- doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
- org.apache.pulsar.functions.utils.Utils.getSourceType(anyString(), any(NarClassLoader.class));
+ mockStatic(FunctionCommon.class);
+ doReturn(String.class).when(FunctionCommon.class);
+ FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
- doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
- org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
+ doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
+ FunctionCommon.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
@@ -843,12 +845,12 @@ public class SourceApiV3ResourceTest {
doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
- mockStatic(org.apache.pulsar.functions.utils.Utils.class);
- doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
- org.apache.pulsar.functions.utils.Utils.getSourceType(anyString(), any(NarClassLoader.class));
+ mockStatic(FunctionCommon.class);
+ doReturn(String.class).when(FunctionCommon.class);
+ FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
- doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
- org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
+ doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
+ FunctionCommon.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
@@ -879,9 +881,9 @@ public class SourceApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
public void testUpdateSourceUploadFailure() throws Exception {
try {
- mockStatic(Utils.class);
- doThrow(new IOException("upload failure")).when(Utils.class);
- Utils.uploadFileToBookkeeper(
+ mockStatic(WorkerUtils.class);
+ doThrow(new IOException("upload failure")).when(WorkerUtils.class);
+ WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
@@ -896,9 +898,9 @@ public class SourceApiV3ResourceTest {
@Test
public void testUpdateSourceSuccess() throws Exception {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadFileToBookkeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
@@ -933,12 +935,12 @@ public class SourceApiV3ResourceTest {
doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
- mockStatic(org.apache.pulsar.functions.utils.Utils.class);
- doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
- org.apache.pulsar.functions.utils.Utils.getSourceType(anyString(), any(NarClassLoader.class));
+ mockStatic(FunctionCommon.class);
+ doReturn(String.class).when(FunctionCommon.class);
+ FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
- doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
- org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
+ doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
+ FunctionCommon.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
@@ -967,9 +969,9 @@ public class SourceApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "source failed to register")
public void testUpdateSourceFailure() throws Exception {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadFileToBookkeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
@@ -992,9 +994,9 @@ public class SourceApiV3ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registration interrupted")
public void testUpdateSourceInterrupted() throws Exception {
try {
- mockStatic(Utils.class);
- doNothing().when(Utils.class);
- Utils.uploadFileToBookkeeper(
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
@@ -1321,9 +1323,9 @@ public class SourceApiV3ResourceTest {
FunctionDetails.newBuilder().setName("test-3").build()).build();
functionMetaDataList.add(f3);
when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
- doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
- doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
- doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+ doReturn(ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
+ doReturn(ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
+ doReturn(ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
List<String> sourceList = listDefaultSources();
assertEquals(functions, sourceList);