You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2019/03/29 04:22:43 UTC
[pulsar] branch master updated: Classloader choice for validating
Source/Sink (#3865)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f3095d8 Classloader choice for validating Source/Sink (#3865)
f3095d8 is described below
commit f3095d8697ccbe62e4676f94e671047a82bebe40
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu Mar 28 21:22:37 2019 -0700
Classloader choice for validating Source/Sink (#3865)
* Try both regular classloader as well as nar class loader for validating source/sinks
* Fixed test
* Fix unittest
* Added more comments to the code
* rename variables
* Wait for the create to succeed before updating. Otherwise there might be some reamnant producers
---
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 8 +++
.../pulsar/functions/utils/SinkConfigUtils.java | 34 +++++++++--
.../pulsar/functions/utils/SourceConfigUtils.java | 32 ++++++++--
.../functions/worker/rest/api/ComponentImpl.java | 3 +
.../worker/rest/api/v3/SinkApiV3ResourceTest.java | 2 +-
.../rest/api/v3/SourceApiV3ResourceTest.java | 2 +-
.../integration/functions/PulsarFunctionsTest.java | 68 ++++++++++++++++++++++
7 files changed, 137 insertions(+), 12 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 422e9ed..042a952 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -599,6 +599,14 @@ public class PulsarFunctionE2ETest {
SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, functionName, sinkTopic);
admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);
+ retryStrategically((test) -> {
+ try {
+ return (admin.topics().getStats(sinkTopic).publishers.size() == 1);
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 10, 150);
+
admin.source().updateSourceWithUrl(sourceConfig, jarFilePathUrl);
retryStrategically((test) -> {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 42538a2..81554c4 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -24,6 +24,7 @@ import com.google.gson.reflect.TypeToken;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
@@ -46,6 +47,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
import static org.apache.pulsar.functions.utils.Utils.getSinkType;
+@Slf4j
public class SinkConfigUtils {
@Getter
@@ -296,14 +298,37 @@ public class SinkConfigUtils {
}
String sinkClassName;
- ClassLoader classLoader;
+ final Class<?> typeArg;
+ final ClassLoader classLoader;
if (!isEmpty(sinkConfig.getClassName())) {
sinkClassName = sinkConfig.getClassName();
+ // We really don't know if we should use nar class loader or regular classloader
+ ClassLoader jarClassLoader = null;
+ ClassLoader narClassLoader = null;
try {
- classLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+ jarClassLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
} catch (Exception e) {
- throw new IllegalArgumentException("Invalid Sink Jar");
}
+ try {
+ narClassLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+ } catch (Exception e) {
+ }
+ if (jarClassLoader == null && narClassLoader == null) {
+ throw new IllegalArgumentException("Invalid Sink Package");
+ }
+ // We use typeArg and classLoader as arguments for lambda functions that require them to be final
+ // Thus we use these tmp vars
+ Class<?> tmptypeArg;
+ ClassLoader tmpclassLoader;
+ try {
+ tmptypeArg = getSinkType(sinkClassName, narClassLoader);
+ tmpclassLoader = narClassLoader;
+ } catch (Exception e) {
+ tmptypeArg = getSinkType(sinkClassName, jarClassLoader);
+ tmpclassLoader = jarClassLoader;
+ }
+ typeArg = tmptypeArg;
+ classLoader = tmpclassLoader;
} else if (!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
throw new IllegalArgumentException("Class-name must be present for archive with file-url");
} else {
@@ -316,10 +341,9 @@ public class SinkConfigUtils {
} catch (IOException e1) {
throw new IllegalArgumentException("Failed to extract sink class from archive", e1);
}
+ typeArg = getSinkType(sinkClassName, classLoader);
}
- Class<?> typeArg = getSinkType(sinkClassName, classLoader);
-
if (sinkConfig.getTopicToSerdeClassName() != null) {
sinkConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> {
ValidatorUtils.validateSerde(serdeClassName, typeArg, classLoader, true);
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index f721112..9a32085 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -209,14 +209,37 @@ public class SourceConfigUtils {
}
String sourceClassName;
- ClassLoader classLoader;
+ final Class<?> typeArg;
+ final ClassLoader classLoader;
if (!isEmpty(sourceConfig.getClassName())) {
sourceClassName = sourceConfig.getClassName();
+ // We really don't know if we should use nar class loader or regular classloader
+ ClassLoader jarClassLoader = null;
+ ClassLoader narClassLoader = null;
try {
- classLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+ jarClassLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
} catch (Exception e) {
- throw new IllegalArgumentException("Invalid Source Jar");
}
+ try {
+ narClassLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+ } catch (Exception e) {
+ }
+ if (jarClassLoader == null && narClassLoader == null) {
+ throw new IllegalArgumentException("Invalid Source Package");
+ }
+ // We use typeArg and classLoader as arguments for lambda functions that require them to be final
+ // Thus we use these tmp vars
+ Class<?> tmptypeArg;
+ ClassLoader tmpclassLoader;
+ try {
+ tmptypeArg = getSourceType(sourceClassName, narClassLoader);
+ tmpclassLoader = narClassLoader;
+ } catch (Exception e) {
+ tmptypeArg = getSourceType(sourceClassName, jarClassLoader);
+ tmpclassLoader = jarClassLoader;
+ }
+ typeArg = tmptypeArg;
+ classLoader = tmpclassLoader;
} else if (!StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
throw new IllegalArgumentException("Class-name must be present for archive with file-url");
} else {
@@ -229,10 +252,9 @@ public class SourceConfigUtils {
} catch (IOException e1) {
throw new IllegalArgumentException("Failed to extract source class from archive", e1);
}
+ typeArg = getSourceType(sourceClassName, classLoader);
}
- Class<?> typeArg = getSourceType(sourceClassName, classLoader);
-
// Only one of serdeClassName or schemaType should be set
if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName()) && !StringUtils.isEmpty(sourceConfig.getSchemaType())) {
throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 0ed9fd1..fd75da3 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -588,6 +588,9 @@ public abstract class ComponentImpl {
} else if (uploadedInputStream != null) {
functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, uploadedInputStreamAsFile,
fileDetail, functionDetailsJson, mergedComponentConfigJson, componentType);
+ } else if (existingComponent.getPackageLocation().getPackagePath().startsWith("builtin://")) {
+ functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, null,
+ null, functionDetailsJson, mergedComponentConfigJson, componentType);
} else {
functionDetails = validateUpdateRequestParamsWithExistingMetadata(
tenant, namespace, componentName, existingComponent.getPackageLocation(), mergedComponentConfigJson, componentType);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 192fd1a..b723411 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -361,7 +361,7 @@ public class SinkApiV3ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Invalid Sink Jar")
+ @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Invalid Sink Package")
public void testRegisterSinkHttpUrl() {
try {
testRegisterSinkMissingArguments(
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 25a0845..adaa606 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -325,7 +325,7 @@ public class SourceApiV3ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Invalid Source Jar")
+ @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Invalid Source Package")
public void testRegisterSourceHttpUrl() {
try {
testRegisterSourceMissingArguments(
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index d8e5f50..a60eb47 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -172,6 +172,9 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
// validate the sink result
tester.validateSinkResult(kvs);
+ // update the sink
+ updateSinkConnector(tester, tenant, namespace, sinkName, inputTopicName);
+
// delete the sink
deleteSink(tenant, namespace, sinkName);
@@ -220,6 +223,45 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
result.getStdout());
}
+ protected void updateSinkConnector(SinkTester tester,
+ String tenant,
+ String namespace,
+ String sinkName,
+ String inputTopicName) throws Exception {
+ String[] commands;
+ if (tester.getSinkType() != SinkTester.SinkType.UNDEFINED) {
+ commands = new String[] {
+ PulsarCluster.ADMIN_SCRIPT,
+ "sink", "update",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sinkName,
+ "--sink-type", tester.sinkType().name().toLowerCase(),
+ "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
+ "--inputs", inputTopicName,
+ "--parallelism", "2"
+ };
+ } else {
+ commands = new String[] {
+ PulsarCluster.ADMIN_SCRIPT,
+ "sink", "create",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sinkName,
+ "--archive", tester.getSinkArchive(),
+ "--classname", tester.getSinkClassName(),
+ "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
+ "--inputs", inputTopicName,
+ "--parallelism", "2"
+ };
+ }
+ log.info("Run command : {}", StringUtils.join(commands, ' '));
+ ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ assertTrue(
+ result.getStdout().contains("\"Updated successfully\""),
+ result.getStdout());
+ }
+
protected void getSinkInfoSuccess(SinkTester tester,
String tenant,
String namespace,
@@ -422,6 +464,9 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
// validate the source result
validateSourceResult(consumer, kvs);
+ // update the source connector
+ updateSourceConnector(tester, tenant, namespace, sourceName, outputTopicName);
+
// delete the source
deleteSource(tenant, namespace, sourceName);
@@ -455,6 +500,29 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
result.getStdout());
}
+ protected void updateSourceConnector(SourceTester tester,
+ String tenant,
+ String namespace,
+ String sourceName,
+ String outputTopicName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "source", "update",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName,
+ "--source-type", tester.sourceType(),
+ "--sourceConfig", new Gson().toJson(tester.sourceConfig()),
+ "--destinationTopicName", outputTopicName,
+ "--parallelism", "2"
+ };
+ log.info("Run command : {}", StringUtils.join(commands, ' '));
+ ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ assertTrue(
+ result.getStdout().contains("\"Updated successfully\""),
+ result.getStdout());
+ }
+
protected void getSourceInfoSuccess(SourceTester tester,
String tenant,
String namespace,