You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/07/03 21:31:28 UTC
[pulsar] branch master updated: fix issue when submitting NAR via
file url (#4577)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e8025d5 fix issue when submitting NAR via file url (#4577)
e8025d5 is described below
commit e8025d50c5d2cf0a632ad1573308b676d0607923
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed Jul 3 14:31:23 2019 -0700
fix issue when submitting NAR via file url (#4577)
* fix issue when submitting NAR via file url
* fix unit tests
* add more specific errors
* fix test
---
.../worker/PulsarFunctionLocalRunTest.java | 21 ++-
.../org/apache/pulsar/functions/LocalRunner.java | 1 -
.../pulsar/functions/utils/SinkConfigUtils.java | 145 +++++++++++++--------
.../pulsar/functions/utils/SourceConfigUtils.java | 122 ++++++++++-------
.../functions/worker/rest/api/FunctionsImpl.java | 1 -
.../functions/worker/rest/api/SinksImpl.java | 1 -
.../functions/worker/rest/api/SourcesImpl.java | 1 -
.../worker/rest/api/v3/SinkApiV3ResourceTest.java | 17 ++-
.../rest/api/v3/SourceApiV3ResourceTest.java | 5 +-
9 files changed, 192 insertions(+), 122 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index bdcd27d..db7dbf2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -599,12 +599,11 @@ public class PulsarFunctionLocalRunTest {
testPulsarSourceLocalRun(null);
}
- // TODO bug to fix involving submitting a NAR via URI file:///tmp/pulsar-io-twitter-0.0.1.nar
-// @Test(timeOut = 20000)
-// public void testPulsarSourceLocalRunWithFile() throws Exception {
-// String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
-// testPulsarSourceStats(jarFilePathUrl);
-// }
+ @Test(timeOut = 20000)
+ public void testPulsarSourceLocalRunWithFile() throws Exception {
+ String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
+ testPulsarSourceLocalRun(jarFilePathUrl);
+ }
@Test(timeOut = 40000)
public void testPulsarSourceLocalRunWithUrl() throws Exception {
@@ -705,11 +704,11 @@ public class PulsarFunctionLocalRunTest {
testPulsarSinkStats(null);
}
-// @Test(timeOut = 20000)
-// public void testPulsarSinkStatsWithFile() throws Exception {
-// String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
-// testPulsarSinkStats(jarFilePathUrl);
-// }
+ @Test(timeOut = 20000)
+ public void testPulsarSinkStatsWithFile() throws Exception {
+ String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
+ testPulsarSinkStats(jarFilePathUrl);
+ }
@Test(timeOut = 40000)
public void testPulsarSinkStatsWithUrl() throws Exception {
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 1540744..f4c6eca 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -248,7 +248,6 @@ public class LocalRunner {
.loadClass(LocalRunner.class.getName())
.getProtectionDomain().getCodeSource().getLocation().getFile();
}
- log.info("userCodeFile: {}", userCodeFile);
String builtInSource = isBuiltInSource(userCodeFile);
if (builtInSource != null) {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 42eedc3..f0a39b9 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -42,7 +42,11 @@ import java.io.File;
import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.file.Path;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -283,7 +287,7 @@ public class SinkConfigUtils {
}
public static ExtractedSinkDetails validate(SinkConfig sinkConfig, Path archivePath,
- File uploadedInputStreamAsFile) {
+ File sinkPackageFile) {
if (isEmpty(sinkConfig.getTenant())) {
throw new IllegalArgumentException("Sink tenant cannot be null");
}
@@ -318,79 +322,112 @@ public class SinkConfigUtils {
throw new IllegalArgumentException("Sink timeout must be a positive number");
}
- String sinkClassName;
- final Class<?> typeArg;
- final ClassLoader classLoader;
- if (!isEmpty(sinkConfig.getClassName())) {
- sinkClassName = sinkConfig.getClassName();
- // We really don't know if we should use nar class loader or regular classloader
- ClassLoader jarClassLoader = null;
- ClassLoader narClassLoader = null;
- try {
- jarClassLoader = FunctionCommon.extractClassLoader(archivePath, uploadedInputStreamAsFile);
- } catch (Exception e) {
+ if (archivePath == null && sinkPackageFile == null) {
+ throw new IllegalArgumentException("Sink package is not provided");
+ }
+
+ Class<?> typeArg;
+ ClassLoader classLoader;
+ String sinkClassName = sinkConfig.getClassName();
+ ClassLoader jarClassLoader = null;
+ ClassLoader narClassLoader = null;
+
+ Exception jarClassLoaderException = null;
+ Exception narClassLoaderException = null;
+
+ try {
+ jarClassLoader = FunctionCommon.extractClassLoader(archivePath, sinkPackageFile);
+ } catch (Exception e) {
+ jarClassLoaderException = e;
+ }
+ try {
+ narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sinkPackageFile);
+ } catch (Exception e) {
+ narClassLoaderException = e;
+ }
+
+ // if sink class name is not provided, we can only try to load archive as a NAR
+ if (isEmpty(sinkClassName)) {
+ if (narClassLoader == null) {
+ throw new IllegalArgumentException("Sink package does not have the correct format. " +
+ "Pulsar cannot determine if the package is a NAR package or JAR package." +
+ "Sink classname is not provided and attempts to load it as a NAR package produced error: "
+ + narClassLoaderException.getMessage());
}
try {
- narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, uploadedInputStreamAsFile);
- } catch (Exception e) {
- }
- if (jarClassLoader == null && narClassLoader == null) {
- throw new IllegalArgumentException("Invalid Sink Package");
+ sinkClassName = ConnectorUtils.getIOSinkClass(narClassLoader);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to extract Sink class from archive", e);
}
- // We use typeArg and classLoader as arguments for lambda functions that require them to be final
- // Thus we use these tmp vars
- Class<?> tmptypeArg;
- ClassLoader tmpclassLoader;
try {
- tmptypeArg = getSinkType(sinkClassName, narClassLoader);
- tmpclassLoader = narClassLoader;
- } catch (Exception e) {
+ typeArg = getSinkType(sinkClassName, narClassLoader);
+ classLoader = narClassLoader;
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(
+ String.format("Sink class %s must be in class path", sinkClassName), e);
+ }
+
+ } else {
+ // if sink class name is provided, we need to try to load it as a JAR and as a NAR.
+ if (jarClassLoader != null) {
+ try {
+ typeArg = getSinkType(sinkClassName, jarClassLoader);
+ classLoader = jarClassLoader;
+ } catch (ClassNotFoundException e) {
+ // class not found in JAR try loading as a NAR and searching for the class
+ if (narClassLoader != null) {
+ try {
+ typeArg = getSinkType(sinkClassName, narClassLoader);
+ classLoader = narClassLoader;
+ } catch (ClassNotFoundException e1) {
+ throw new IllegalArgumentException(
+ String.format("Sink class %s must be in class path", sinkClassName), e1);
+ }
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Sink class %s must be in class path", sinkClassName), e);
+ }
+ }
+ } else if (narClassLoader != null) {
try {
- tmptypeArg = getSinkType(sinkClassName, jarClassLoader);
+ typeArg = getSinkType(sinkClassName, narClassLoader);
+ classLoader = narClassLoader;
} catch (ClassNotFoundException e1) {
throw new IllegalArgumentException(
String.format("Sink class %s must be in class path", sinkClassName), e1);
}
- tmpclassLoader = jarClassLoader;
- }
- typeArg = tmptypeArg;
- classLoader = tmpclassLoader;
- } else if (!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
- throw new IllegalArgumentException("Class-name must be present for archive with file-url");
- } else {
- classLoader = FunctionCommon.extractNarClassLoader(archivePath, uploadedInputStreamAsFile);
- if (classLoader == null) {
- throw new IllegalArgumentException("Sink Package is not provided");
- }
- try {
- sinkClassName = ConnectorUtils.getIOSinkClass(classLoader);
- } catch (IOException e1) {
- throw new IllegalArgumentException("Failed to extract sink class from archive", e1);
- }
- try {
- typeArg = getSinkType(sinkClassName, classLoader);
- } catch (ClassNotFoundException e) {
- throw new IllegalArgumentException(
- String.format("Sink class %s must be in class path", sinkClassName), e);
+ } else {
+ StringBuilder errorMsg = new StringBuilder("Sink package does not have the correct format." +
+ " Pulsar cannot determine if the package is a NAR package or JAR package.");
+
+ if (jarClassLoaderException != null) {
+ errorMsg.append("Attempts to load it as a JAR package produced error: " + jarClassLoaderException.getMessage());
+ }
+
+ if (narClassLoaderException != null) {
+ errorMsg.append("Attempts to load it as a NAR package produced error: " + narClassLoaderException.getMessage());
+ }
+
+ throw new IllegalArgumentException(errorMsg.toString());
}
}
if (sinkConfig.getTopicToSerdeClassName() != null) {
- sinkConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> {
- ValidatorUtils.validateSerde(serdeClassName, typeArg, classLoader, true);
- });
+ for (String serdeClassName : sinkConfig.getTopicToSerdeClassName().values()) {
+ ValidatorUtils.validateSerde(serdeClassName, typeArg, classLoader, true);
+ }
}
if (sinkConfig.getTopicToSchemaType() != null) {
- sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> {
+ for (String schemaType : sinkConfig.getTopicToSchemaType().values()) {
ValidatorUtils.validateSchema(schemaType, typeArg, classLoader, true);
- });
+ }
}
// topicsPattern does not need checks
if (sinkConfig.getInputSpecs() != null) {
- sinkConfig.getInputSpecs().forEach((topicName, consumerSpec) -> {
+ for (ConsumerConfig consumerSpec : sinkConfig.getInputSpecs().values()) {
// Only one is set
if (!isEmpty(consumerSpec.getSerdeClassName()) && !isEmpty(consumerSpec.getSchemaType())) {
throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
@@ -401,7 +438,7 @@ public class SinkConfigUtils {
if (!isEmpty(consumerSpec.getSchemaType())) {
ValidatorUtils.validateSchema(consumerSpec.getSchemaType(), typeArg, classLoader, true);
}
- });
+ }
}
return new ExtractedSinkDetails(sinkClassName, typeArg.getName());
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index edbb6d7..149204c 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -222,61 +222,93 @@ public class SourceConfigUtils {
if (sourceConfig.getResources() != null) {
ResourceConfigUtils.validate(sourceConfig.getResources());
}
+ if (archivePath == null && sourcePackageFile == null) {
+ throw new IllegalArgumentException("Source package is not provided");
+ }
- String sourceClassName;
- final Class<?> typeArg;
- final ClassLoader classLoader;
- if (!isEmpty(sourceConfig.getClassName())) {
- sourceClassName = sourceConfig.getClassName();
- // We really don't know if we should use nar class loader or regular classloader
- ClassLoader jarClassLoader = null;
- ClassLoader narClassLoader = null;
- try {
- jarClassLoader = FunctionCommon.extractClassLoader(archivePath, sourcePackageFile);
- } catch (Exception e) {
+ Class<?> typeArg;
+ ClassLoader classLoader;
+ String sourceClassName = sourceConfig.getClassName();
+ ClassLoader jarClassLoader = null;
+ ClassLoader narClassLoader = null;
+
+ Exception jarClassLoaderException = null;
+ Exception narClassLoaderException = null;
+
+ try {
+ jarClassLoader = FunctionCommon.extractClassLoader(archivePath, sourcePackageFile);
+ } catch (Exception e) {
+ jarClassLoaderException = e;
+ }
+ try {
+ narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sourcePackageFile);
+ } catch (Exception e) {
+ narClassLoaderException = e;
+ }
+
+ // if source class name is not provided, we can only try to load archive as a NAR
+ if (isEmpty(sourceClassName)) {
+ if (narClassLoader == null) {
+ throw new IllegalArgumentException("Source package does not have the correct format. " +
+ "Pulsar cannot determine if the package is a NAR package or JAR package." +
+ "Source classname is not provided and attempts to load it as a NAR package produced the following error.",
+ narClassLoaderException);
}
try {
- narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sourcePackageFile);
- } catch (Exception e) {
- }
- if (jarClassLoader == null && narClassLoader == null) {
- throw new IllegalArgumentException("Invalid Source Package");
+ sourceClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to extract source class from archive", e);
}
- // We use typeArg and classLoader as arguments for lambda functions that require them to be final
- // Thus we use these tmp vars
- Class<?> tmptypeArg;
- ClassLoader tmpclassLoader;
try {
- tmptypeArg = getSourceType(sourceClassName, narClassLoader);
- tmpclassLoader = narClassLoader;
- } catch (Exception e) {
+ typeArg = getSourceType(sourceClassName, narClassLoader);
+ classLoader = narClassLoader;
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(
+ String.format("Source class %s must be in class path", sourceClassName), e);
+ }
+
+ } else {
+ // if source class name is provided, we need to try to load it as a JAR and as a NAR.
+ if (jarClassLoader != null) {
try {
- tmptypeArg = getSourceType(sourceClassName, jarClassLoader);
+ typeArg = getSourceType(sourceClassName, jarClassLoader);
+ classLoader = jarClassLoader;
+ } catch (ClassNotFoundException e) {
+ // class not found in JAR try loading as a NAR and searching for the class
+ if (narClassLoader != null) {
+ try {
+ typeArg = getSourceType(sourceClassName, narClassLoader);
+ classLoader = narClassLoader;
+ } catch (ClassNotFoundException e1) {
+ throw new IllegalArgumentException(
+ String.format("Source class %s must be in class path", sourceClassName), e1);
+ }
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Source class %s must be in class path", sourceClassName), e);
+ }
+ }
+ } else if (narClassLoader != null) {
+ try {
+ typeArg = getSourceType(sourceClassName, narClassLoader);
+ classLoader = narClassLoader;
} catch (ClassNotFoundException e1) {
throw new IllegalArgumentException(
String.format("Source class %s must be in class path", sourceClassName), e1);
}
- tmpclassLoader = jarClassLoader;
- }
- typeArg = tmptypeArg;
- classLoader = tmpclassLoader;
- } else if (!StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
- throw new IllegalArgumentException("Class-name must be present for archive with file-url");
- } else {
- classLoader = FunctionCommon.extractNarClassLoader(archivePath, sourcePackageFile);
- if (classLoader == null) {
- throw new IllegalArgumentException("Source Package is not provided");
- }
- try {
- sourceClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) classLoader);
- } catch (IOException e1) {
- throw new IllegalArgumentException("Failed to extract source class from archive", e1);
- }
- try {
- typeArg = getSourceType(sourceClassName, classLoader);
- } catch (ClassNotFoundException e) {
- throw new IllegalArgumentException(
- String.format("Source class %s must be in class path", sourceClassName), e);
+ } else {
+ StringBuilder errorMsg = new StringBuilder("Source package does not have the correct format." +
+ " Pulsar cannot determine if the package is a NAR package or JAR package.");
+
+ if (jarClassLoaderException != null) {
+ errorMsg.append("Attempts to load it as a JAR package produced error: " + jarClassLoaderException.getMessage());
+ }
+
+ if (narClassLoaderException != null) {
+ errorMsg.append("Attempts to load it as a NAR package produced error: " + narClassLoaderException.getMessage());
+ }
+
+ throw new IllegalArgumentException(errorMsg.toString());
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index af23496..d96ed30 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -339,7 +339,6 @@ public class FunctionsImpl extends ComponentImpl {
componentPackageFile = FunctionCommon.createPkgTempFile();
componentPackageFile.deleteOnExit();
- log.info("componentPackageFile: {}", componentPackageFile);
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index 9052266..659cb66 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -344,7 +344,6 @@ public class SinksImpl extends ComponentImpl {
componentPackageFile = FunctionCommon.createPkgTempFile();
componentPackageFile.deleteOnExit();
- log.info("componentPackageFile: {}", componentPackageFile);
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index e14f167..e9cfeb0 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -341,7 +341,6 @@ public class SourcesImpl extends ComponentImpl {
componentPackageFile = FunctionCommon.createPkgTempFile();
componentPackageFile.deleteOnExit();
- log.info("componentPackageFile: {}", componentPackageFile);
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 793e9cd..41b3204 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.functions.worker.rest.api.v3;
import com.google.common.collect.Lists;
-import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.logging.log4j.Level;
@@ -243,7 +242,7 @@ public class SinkApiV3ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink Package is not provided")
+ @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink package is not provided")
public void testRegisterSinkMissingPackage() {
try {
testRegisterSinkMissingArguments(
@@ -283,7 +282,10 @@ public class SinkApiV3ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "zip file is empty")
+ @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink package does not have the" +
+ " correct format. Pulsar cannot determine if the package is a NAR package" +
+ " or JAR package.Sink classname is not provided and attempts to load it as a NAR package produced error: " +
+ "zip file is empty")
public void testRegisterSinkMissingPackageDetails() {
try {
testRegisterSinkMissingArguments(
@@ -303,7 +305,7 @@ public class SinkApiV3ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Failed to extract sink class from archive")
+ @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Failed to extract Sink class from archive")
public void testRegisterSinkInvalidJarNoSink() throws IOException {
try {
FileInputStream inputStream = new FileInputStream(INVALID_JAR_FILE_PATH);
@@ -948,6 +950,7 @@ public class SinkApiV3ResourceTest {
anyString(),
any(File.class),
any(Namespace.class));
+ PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
@@ -961,7 +964,7 @@ public class SinkApiV3ResourceTest {
}
@Test
- public void testUpdateSinkWithUrl() throws IOException, ClassNotFoundException {
+ public void testUpdateSinkWithUrl() throws Exception {
Configurator.setRootLevel(Level.DEBUG);
String filePackageUrl = "file://" + JAR_FILE_PATH;
@@ -982,6 +985,7 @@ public class SinkApiV3ResourceTest {
mockStatic(FunctionCommon.class);
doReturn(String.class).when(FunctionCommon.class);
FunctionCommon.getSinkType(anyString(), any(NarClassLoader.class));
+ PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL", any()).thenCallRealMethod();
doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
FunctionCommon.extractNarClassLoader(any(Path.class), any(File.class));
@@ -989,7 +993,6 @@ public class SinkApiV3ResourceTest {
doReturn(ATLEAST_ONCE).when(FunctionCommon.class);
FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
-
this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
@@ -1019,6 +1022,7 @@ public class SinkApiV3ResourceTest {
anyString(),
any(File.class),
any(Namespace.class));
+ PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
@@ -1044,6 +1048,7 @@ public class SinkApiV3ResourceTest {
anyString(),
any(File.class),
any(Namespace.class));
+ PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 17b4595..276f3c0 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -263,7 +263,7 @@ public class SourceApiV3ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source Package is not provided")
+ @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source package is not provided")
public void testRegisterSourceMissingPackage() {
try {
testRegisterSourceMissingArguments(
@@ -979,7 +979,7 @@ public class SourceApiV3ResourceTest {
}
@Test
- public void testUpdateSourceWithUrl() throws IOException, ClassNotFoundException {
+ public void testUpdateSourceWithUrl() throws Exception {
Configurator.setRootLevel(Level.DEBUG);
String filePackageUrl = "file://" + JAR_FILE_PATH;
@@ -1000,6 +1000,7 @@ public class SourceApiV3ResourceTest {
mockStatic(FunctionCommon.class);
doReturn(String.class).when(FunctionCommon.class);
FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
+ PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL", any()).thenCallRealMethod();
doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
FunctionCommon.extractNarClassLoader(any(Path.class), any(File.class));