You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/05/22 15:24:30 UTC
[pulsar] branch master updated: Added ability to add annotations to
Connector Configs (#6983)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c8170b7 Added ability to add annotations to Connector Configs (#6983)
c8170b7 is described below
commit c8170b7c9f0232c9a005b65b6b146e0417ec3190
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri May 22 08:24:13 2020 -0700
Added ability to add annotations to Connector Configs (#6983)
* Added sourceConfigClass and sinkConfigClass
* Add Validator annotation helpers to validate class parameters
* Fix build errors
* Take feedback into account
* Connected with validation
* Fix bugs
* Added tests
* Fix class name
* Address feedback
Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
.../pulsar/common/io/ConnectorDefinition.java | 22 ++++++++++
.../org/apache/pulsar/functions/LocalRunner.java | 8 ++--
.../pulsar/functions/worker/WorkerConfig.java | 5 +++
.../pulsar/functions/utils/SinkConfigUtils.java | 37 ++++++++++++++++-
.../pulsar/functions/utils/SourceConfigUtils.java | 34 ++++++++++++++-
.../pulsar/functions/utils/io/ConnectorUtils.java | 16 ++++----
.../functions/utils/SinkConfigUtilsTest.java | 47 ++++++++++++++++++++-
.../functions/utils/SourceConfigUtilsTest.java | 48 +++++++++++++++++++++-
.../functions/worker/rest/api/SinksImpl.java | 3 +-
.../functions/worker/rest/api/SourcesImpl.java | 3 +-
.../resources/META-INF/services/pulsar-io.yaml | 1 +
.../resources/META-INF/services/pulsar-io.yaml | 1 +
.../resources/META-INF/services/pulsar-io.yaml | 1 +
.../resources/META-INF/services/pulsar-io.yaml | 1 +
.../resources/META-INF/services/pulsar-io.yaml | 1 +
.../resources/META-INF/services/pulsar-io.yaml | 3 +-
.../resources/META-INF/services/pulsar-io.yaml | 1 +
.../resources/META-INF/services/pulsar-io.yaml | 1 +
.../resources/META-INF/services/pulsar-io.yaml | 1 +
.../resources/META-INF/services/pulsar-io.yaml | 1 +
.../resources/META-INF/services/pulsar-io.yaml | 1 +
.../resources/META-INF/services/pulsar-io.yaml | 2 +
.../resources/META-INF/services/pulsar-io.yaml | 2 +
.../resources/META-INF/services/pulsar-io.yaml | 2 +
.../resources/META-INF/services/pulsar-io.yaml | 1 +
.../resources/META-INF/services/pulsar-io.yaml | 4 +-
.../resources/META-INF/services/pulsar-io.yaml | 1 +
.../resources/META-INF/services/pulsar-io.yaml | 1 +
.../resources/META-INF/services/pulsar-io.yaml | 1 +
29 files changed, 229 insertions(+), 21 deletions(-)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/ConnectorDefinition.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/ConnectorDefinition.java
index d1bb334..99c3738 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/ConnectorDefinition.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/ConnectorDefinition.java
@@ -51,4 +51,26 @@ public class ConnectorDefinition {
* <p>If not defined, it will be assumed this connector cannot act as a data sink.
*/
private String sinkClass;
+
+ /**
+ * The class name for the source config implementation.
+ * Most of the sources are using a config class for managing their config
+ * and directly convert the supplied Map object at open to this object.
+ * These connector can declare their config class in this variable that will allow
+ * the framework to check for config parameter checking at submission time.
+ *
+ * <p>If not defined, the framework will not be able to do any submission time checks.
+ */
+ private String sourceConfigClass;
+
+ /**
+ * The class name for the sink config implementation.
+ * Most of the sink are using a config class for managing their config
+ * and directly convert the supplied Map object at open to this object.
+ * These connector can declare their config class in this variable that will allow
+ * the framework to check for config parameter checking at submission time.
+ *
+ * <p>If not defined, the framework will not be able to do any submission time checks.
+ */
+ private String sinkConfigClass;
}
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 4f63bd2..3170e6e 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -257,14 +257,14 @@ public class LocalRunner {
if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
- functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file, narExtractionDirectory));
+ functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file, narExtractionDirectory, true));
} else {
File file = new File(userCodeFile);
if (!file.exists()) {
throw new RuntimeException("Source archive (" + userCodeFile + ") does not exist");
}
- functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file, narExtractionDirectory));
+ functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file, narExtractionDirectory, true));
}
} else if (sinkConfig != null) {
inferMissingArguments(sinkConfig);
@@ -285,13 +285,13 @@ public class LocalRunner {
if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
- functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file, narExtractionDirectory));
+ functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file, narExtractionDirectory, true));
} else {
File file = new File(userCodeFile);
if (!file.exists()) {
throw new RuntimeException("Sink archive does not exist");
}
- functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file, narExtractionDirectory));
+ functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file, narExtractionDirectory, true));
}
} else {
throw new IllegalArgumentException("Must specify Function, Source or Sink config");
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 4fd21f0..b1cf7cb 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -141,6 +141,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
)
private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
@FieldContext(
+ category = CATEGORY_CONNECTORS,
+ doc = "Should we validate connector config during submission"
+ )
+ private Boolean validateConnectorConfig = false;
+ @FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The pulsar topic used for storing function metadata"
)
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 57aa8ec..7bb9f77 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -20,6 +20,7 @@
package org.apache.pulsar.functions.utils;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import lombok.AllArgsConstructor;
@@ -31,9 +32,13 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
+import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.validator.ConfigValidation;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -302,7 +307,8 @@ public class SinkConfigUtils {
}
public static ExtractedSinkDetails validate(SinkConfig sinkConfig, Path archivePath,
- File sinkPackageFile, String narExtractionDirectory) {
+ File sinkPackageFile, String narExtractionDirectory,
+ boolean validateConnectorConfig) {
if (isEmpty(sinkConfig.getTenant())) {
throw new IllegalArgumentException("Sink tenant cannot be null");
}
@@ -374,6 +380,9 @@ public class SinkConfigUtils {
} catch (IOException e) {
throw new IllegalArgumentException("Failed to extract Sink class from archive", e);
}
+ if (validateConnectorConfig) {
+ validateConnectorConfig(sinkConfig, (NarClassLoader) narClassLoader);
+ }
try {
typeArg = getSinkType(sinkClassName, narClassLoader);
classLoader = narClassLoader;
@@ -398,12 +407,18 @@ public class SinkConfigUtils {
throw new IllegalArgumentException(
String.format("Sink class %s must be in class path", sinkClassName), e1);
}
+ if (validateConnectorConfig) {
+ validateConnectorConfig(sinkConfig, (NarClassLoader) narClassLoader);
+ }
} else {
throw new IllegalArgumentException(
String.format("Sink class %s must be in class path", sinkClassName), e);
}
}
} else if (narClassLoader != null) {
+ if (validateConnectorConfig) {
+ validateConnectorConfig(sinkConfig, (NarClassLoader) narClassLoader);
+ }
try {
typeArg = getSinkType(sinkClassName, narClassLoader);
classLoader = narClassLoader;
@@ -580,4 +595,24 @@ public class SinkConfigUtils {
}
return mergedConfig;
}
+
+ public static void validateConnectorConfig(SinkConfig sinkConfig, ClassLoader classLoader) {
+ try {
+ ConnectorDefinition defn = ConnectorUtils.getConnectorDefinition(classLoader);
+ if (defn.getSinkConfigClass() != null) {
+ Class configClass = Class.forName(defn.getSinkConfigClass(), true, classLoader);
+ Object configObject =
+ ObjectMapperFactory.getThreadLocal().convertValue(sinkConfig.getConfigs(), configClass);
+ if (configObject != null) {
+ ConfigValidation.validateConfig(configObject);
+ }
+ }
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Error validating sink config", e);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("Could not find sink config class", e);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Could not validate sink config: " + e.getMessage());
+ }
+ }
}
\ No newline at end of file
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index b184ef0..38939a6 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -20,6 +20,7 @@
package org.apache.pulsar.functions.utils;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import lombok.AllArgsConstructor;
@@ -28,10 +29,12 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.Resources;
+import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.validator.ConfigValidation;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -209,7 +212,8 @@ public class SourceConfigUtils {
}
public static ExtractedSourceDetails validate(SourceConfig sourceConfig, Path archivePath,
- File sourcePackageFile, String narExtractionDirectory) {
+ File sourcePackageFile, String narExtractionDirectory,
+ boolean validateConnectorConfig) {
if (isEmpty(sourceConfig.getTenant())) {
throw new IllegalArgumentException("Source tenant cannot be null");
}
@@ -268,6 +272,9 @@ public class SourceConfigUtils {
} catch (IOException e) {
throw new IllegalArgumentException("Failed to extract source class from archive", e);
}
+ if (validateConnectorConfig) {
+ validateConnectorConfig(sourceConfig, (NarClassLoader) narClassLoader);
+ }
try {
typeArg = getSourceType(sourceClassName, narClassLoader);
classLoader = narClassLoader;
@@ -292,12 +299,18 @@ public class SourceConfigUtils {
throw new IllegalArgumentException(
String.format("Source class %s must be in class path", sourceClassName), e1);
}
+ if (validateConnectorConfig) {
+ validateConnectorConfig(sourceConfig, (NarClassLoader) narClassLoader);
+ }
} else {
throw new IllegalArgumentException(
String.format("Source class %s must be in class path", sourceClassName), e);
}
}
} else if (narClassLoader != null) {
+ if (validateConnectorConfig) {
+ validateConnectorConfig(sourceConfig, (NarClassLoader) narClassLoader);
+ }
try {
typeArg = getSourceType(sourceClassName, narClassLoader);
classLoader = narClassLoader;
@@ -386,4 +399,23 @@ public class SourceConfigUtils {
return mergedConfig;
}
+ public static void validateConnectorConfig(SourceConfig sourceConfig, ClassLoader classLoader) {
+ try {
+ ConnectorDefinition defn = ConnectorUtils.getConnectorDefinition(classLoader);
+ if (defn.getSourceConfigClass() != null) {
+ Class configClass = Class.forName(defn.getSourceConfigClass(), true, classLoader);
+ Object configObject = ObjectMapperFactory.getThreadLocal().convertValue(sourceConfig.getConfigs(), configClass);
+ if (configObject != null) {
+ ConfigValidation.validateConfig(configObject);
+ }
+ }
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Error validating source config", e);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("Could not find source config class");
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Could not validate source config: " + e.getMessage());
+ }
+ }
+
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
index a78c24c..e96afdd 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
@@ -74,11 +74,8 @@ public class ConnectorUtils {
* Extract the Pulsar IO Sink class from a connector archive.
*/
public static String getIOSinkClass(ClassLoader classLoader) throws IOException {
+ ConnectorDefinition conf = getConnectorDefinition(classLoader);
NarClassLoader ncl = (NarClassLoader) classLoader;
- String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
-
- ConnectorDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
- ConnectorDefinition.class);
if (StringUtils.isEmpty(conf.getSinkClass())) {
throw new IOException(
String.format("The '%s' connector does not provide a sink implementation", conf.getName()));
@@ -100,12 +97,17 @@ public class ConnectorUtils {
public static ConnectorDefinition getConnectorDefinition(String narPath, String narExtractionDirectory) throws IOException {
try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), narExtractionDirectory)) {
- String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
-
- return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, ConnectorDefinition.class);
+ return getConnectorDefinition(ncl);
}
}
+ public static ConnectorDefinition getConnectorDefinition(ClassLoader classLoader) throws IOException {
+ NarClassLoader narClassLoader = (NarClassLoader) classLoader;
+ String configStr = narClassLoader.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+
+ return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, ConnectorDefinition.class);
+ }
+
public static Connectors searchForConnectors(String connectorsDirectory, String narExtractionDirectory) throws IOException {
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for connectors in {}", path);
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index 47515d6..8e9df03 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -19,12 +19,22 @@
package org.apache.pulsar.functions.utils;
import com.google.gson.Gson;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.Accessors;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
+import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.validator.ConfigValidationAnnotations;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.utils.io.ConnectorUtils;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.testng.PowerMockTestCase;
import org.testng.annotations.Test;
import java.io.IOException;
@@ -33,12 +43,26 @@ import java.util.HashMap;
import java.util.Map;
import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
-import static org.testng.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.testng.Assert.*;
/**
* Unit test of {@link Reflections}.
*/
-public class SinkConfigUtilsTest {
+@PrepareForTest(ConnectorUtils.class)
+@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "javax.xml.*", "org.xml.*", "org.w3c.dom.*", "org.springframework.context.*", "org.apache.log4j.*", "com.sun.org.apache.xerces.*", "javax.management.*" })
+public class SinkConfigUtilsTest extends PowerMockTestCase {
+
+ private ConnectorDefinition defn;
+
+ @Data
+ @Accessors(chain = true)
+ @NoArgsConstructor
+ public static class TestSinkConfig {
+ @ConfigValidationAnnotations.NotNull
+ private String configParameter;
+ }
@Test
public void testConvertBackFidelity() throws IOException {
@@ -279,6 +303,25 @@ public class SinkConfigUtilsTest {
);
}
+ @Test
+ public void testValidateConfig() throws IOException {
+ mockStatic(ConnectorUtils.class);
+ defn = new ConnectorDefinition();
+ defn.setSinkConfigClass(TestSinkConfig.class.getName());
+ PowerMockito.when(ConnectorUtils.getConnectorDefinition(any())).thenReturn(defn);
+
+ SinkConfig sinkConfig = createSinkConfig();
+
+ // Good config
+ sinkConfig.getConfigs().put("configParameter", "Test");
+ SinkConfigUtils.validateConnectorConfig(sinkConfig, Thread.currentThread().getContextClassLoader());
+
+ // Bad config
+ sinkConfig.getConfigs().put("configParameter", null);
+ Exception e = expectThrows(IllegalArgumentException.class, () -> SinkConfigUtils.validateConnectorConfig(sinkConfig, Thread.currentThread().getContextClassLoader()));
+ assertTrue(e.getMessage().contains("Could not validate sink config: Field 'configParameter' cannot be null!"));
+ }
+
private SinkConfig createSinkConfig() {
SinkConfig sinkConfig = new SinkConfig();
sinkConfig.setTenant("test-tenant");
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index 3c6ee73..4c80b23 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -19,11 +19,22 @@
package org.apache.pulsar.functions.utils;
import com.google.gson.Gson;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.Accessors;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.common.validator.ConfigValidationAnnotations;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.utils.io.ConnectorUtils;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.testng.PowerMockTestCase;
import org.testng.annotations.Test;
import java.io.IOException;
@@ -32,12 +43,26 @@ import java.util.HashMap;
import java.util.Map;
import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
-import static org.testng.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.testng.Assert.*;
/**
* Unit test of {@link Reflections}.
*/
-public class SourceConfigUtilsTest {
+@PrepareForTest(ConnectorUtils.class)
+@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "javax.xml.*", "org.xml.*", "org.w3c.dom.*", "org.springframework.context.*", "org.apache.log4j.*", "com.sun.org.apache.xerces.*", "javax.management.*" })
+public class SourceConfigUtilsTest extends PowerMockTestCase {
+
+ private ConnectorDefinition defn;
+
+ @Data
+ @Accessors(chain = true)
+ @NoArgsConstructor
+ public static class TestSourceConfig {
+ @ConfigValidationAnnotations.NotNull
+ private String configParameter;
+ }
@Test
public void testConvertBackFidelity() throws IOException {
@@ -213,6 +238,25 @@ public class SourceConfigUtilsTest {
);
}
+ @Test
+ public void testValidateConfig() throws IOException {
+ mockStatic(ConnectorUtils.class);
+ defn = new ConnectorDefinition();
+ defn.setSourceConfigClass(SourceConfigUtilsTest.TestSourceConfig.class.getName());
+ PowerMockito.when(ConnectorUtils.getConnectorDefinition(any())).thenReturn(defn);
+
+ SourceConfig sourceConfig = createSourceConfig();
+
+ // Good config
+ sourceConfig.getConfigs().put("configParameter", "Test");
+ SourceConfigUtils.validateConnectorConfig(sourceConfig, Thread.currentThread().getContextClassLoader());
+
+ // Bad config
+ sourceConfig.getConfigs().put("configParameter", null);
+ Exception e = expectThrows(IllegalArgumentException.class, () -> SourceConfigUtils.validateConnectorConfig(sourceConfig, Thread.currentThread().getContextClassLoader()));
+ assertTrue(e.getMessage().contains("Could not validate source config: Field 'configParameter' cannot be null!"));
+ }
+
private SourceConfig createSourceConfig() {
SourceConfig sourceConfig = new SourceConfig();
sourceConfig.setTenant("test-tenant");
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index 560c8cb..2a0d3c3 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -689,7 +689,8 @@ public class SinksImpl extends ComponentImpl {
}
}
SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validate(sinkConfig, archivePath,
- componentPackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
+ componentPackageFile, worker().getWorkerConfig().getNarExtractionDirectory(),
+ worker().getWorkerConfig().getValidateConnectorConfig());
return SinkConfigUtils.convert(sinkConfig, sinkDetails);
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index 4f434fa..97c22eb 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -685,7 +685,8 @@ public class SourcesImpl extends ComponentImpl {
}
}
SourceConfigUtils.ExtractedSourceDetails sourceDetails = SourceConfigUtils.validate(sourceConfig, archivePath,
- sourcePackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
+ sourcePackageFile, worker().getWorkerConfig().getNarExtractionDirectory(),
+ worker().getWorkerConfig().getValidateConnectorConfig());
return SourceConfigUtils.convert(sourceConfig, sourceDetails);
}
}
diff --git a/pulsar-io/aerospike/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/aerospike/src/main/resources/META-INF/services/pulsar-io.yaml
index f2a7ab5..d8f20ec 100644
--- a/pulsar-io/aerospike/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/aerospike/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
name: aerospike
description: Aerospike database sink
sinkClass: org.apache.pulsar.io.aerospike.AerospikeStringSink
+sinkConfigClass: org.apache.pulsar.io.aerospike.AerospikeSinkConfig
diff --git a/pulsar-io/canal/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/canal/src/main/resources/META-INF/services/pulsar-io.yaml
index 2ecf4df..6ea58b0 100644
--- a/pulsar-io/canal/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/canal/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
name: canal
description: canal source and read data from mysql
sourceClass: org.apache.pulsar.io.canal.CanalStringSource
+sourceConfigClass: org.apache.pulsar.io.canal.CanalSourceConfig
diff --git a/pulsar-io/cassandra/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/cassandra/src/main/resources/META-INF/services/pulsar-io.yaml
index 0ceb7442..b486321 100644
--- a/pulsar-io/cassandra/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/cassandra/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
name: cassandra
description: Writes data into Cassandra
sinkClass: org.apache.pulsar.io.cassandra.CassandraStringSink
+sinkConfigClass: org.apache.pulsar.io.cassandra.CassandraSinkConfig
diff --git a/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml
index 97789e9..0603290 100644
--- a/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
name: elastic_search
description: Writes data into Elastic Search
sinkClass: org.apache.pulsar.io.elasticsearch.ElasticSearchSink
+sinkConfigClass: org.apache.pulsar.io.elasticsearch.ElasticSearchConfig
diff --git a/pulsar-io/file/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/file/src/main/resources/META-INF/services/pulsar-io.yaml
index df455c2..4651054 100644
--- a/pulsar-io/file/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/file/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
name: file
description: Reads data from local filesystem
sourceClass: org.apache.pulsar.io.file.FileSource
+sourceConfigClass: org.apache.pulsar.io.file.FileSourceConfig
diff --git a/pulsar-io/flume/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/flume/src/main/resources/META-INF/services/pulsar-io.yaml
index 0e578b8..580e528 100644
--- a/pulsar-io/flume/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/flume/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -19,4 +19,5 @@
name: flume
description: flume source and sink connector
sourceClass: org.apache.pulsar.io.flume.source.StringSource
-sinkClass: org.apache.pulsar.io.flume.sink.StringSink
\ No newline at end of file
+sinkClass: org.apache.pulsar.io.flume.sink.StringSink
+sinkConfigClass: org.apache.pulsar.io.flume.FlumeConfig
diff --git a/pulsar-io/hbase/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/hbase/src/main/resources/META-INF/services/pulsar-io.yaml
index e0082d2..b2dff63 100644
--- a/pulsar-io/hbase/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/hbase/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
name: hbase
description: Writes data into hbase table
sinkClass: org.apache.pulsar.io.hbase.sink.HbaseGenericRecordSink
+sinkConfigClass: org.apache.pulsar.io.hbase.sink.HbaseSinkConfig
diff --git a/pulsar-io/hdfs2/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/hdfs2/src/main/resources/META-INF/services/pulsar-io.yaml
index 92dd642..8bc3395 100644
--- a/pulsar-io/hdfs2/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/hdfs2/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
name: hdfs2
description: Writes data into HDFS 2.x
sinkClass: org.apache.pulsar.io.hdfs2.sink.text.HdfsStringSink
+sinkConfigClass: org.apache.pulsar.io.hdfs2.sink.HdfsSinkConfig
diff --git a/pulsar-io/hdfs3/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/hdfs3/src/main/resources/META-INF/services/pulsar-io.yaml
index bb672b3..58c7109 100644
--- a/pulsar-io/hdfs3/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/hdfs3/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
name: hdfs3
description: Writes data into HDFS 3.x
sinkClass: org.apache.pulsar.io.hdfs3.sink.text.HdfsStringSink
+sinkConfigClass: org.apache.pulsar.io.hdfs3.sink.HdfsSinkConfig
diff --git a/pulsar-io/influxdb/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/influxdb/src/main/resources/META-INF/services/pulsar-io.yaml
index d97451d..28e046d 100644
--- a/pulsar-io/influxdb/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/influxdb/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
name: influxdb
description: Writes data into InfluxDB database
sinkClass: org.apache.pulsar.io.influxdb.InfluxDBGenericRecordSink
+sinkConfigClass: org.apache.pulsar.io.influxdb.v2.InfluxDBSinkConfig
diff --git a/pulsar-io/jdbc/sqlite/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/jdbc/sqlite/src/main/resources/META-INF/services/pulsar-io.yaml
index a1b9afd..8cd2e12 100644
--- a/pulsar-io/jdbc/sqlite/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/jdbc/sqlite/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
name: jdbc-sqlite
description: JDBC sink for SQLite
sinkClass: org.apache.pulsar.io.jdbc.SqliteJdbcAutoSchemaSink
+sinkConfigClass: org.apache.pulsar.io.jdbc.JdbcSinkConfig
diff --git a/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml
index c3fd86d..b718587 100644
--- a/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -21,3 +21,5 @@ name: kafka
description: Kafka source and sink connector
sourceClass: org.apache.pulsar.io.kafka.KafkaBytesSource
sinkClass: org.apache.pulsar.io.kafka.KafkaBytesSink
+sourceConfigClass: org.apache.pulsar.io.kafka.KafkaSourceConfig
+sinkConfigClass: org.apache.pulsar.io.kafka.KafkaSinkConfig
diff --git a/pulsar-io/kinesis/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/kinesis/src/main/resources/META-INF/services/pulsar-io.yaml
index 3114f4c..7a1e3b4 100644
--- a/pulsar-io/kinesis/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/kinesis/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -21,3 +21,5 @@ name: kinesis
description: Kinesis connectors
sinkClass: org.apache.pulsar.io.kinesis.KinesisSink
sourceClass: org.apache.pulsar.io.kinesis.KinesisSource
+sourceConfigClass: org.apache.pulsar.io.kinesis.KinesisSourceConfig
+sinkConfigClass: org.apache.pulsar.io.kinesis.KinesisSinkConfig
diff --git a/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml
index bc32292..4fab476 100644
--- a/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,5 @@ name: mongo
description: MongoDB source and sink connector
sinkClass: org.apache.pulsar.io.mongodb.MongoSink
sourceClass: org.apache.pulsar.io.mongodb.MongoSource
+sourceConfigClass: org.apache.pulsar.io.mongodb.MongoConfig
+sinkConfigClass: org.apache.pulsar.io.mongodb.MongoConfig
diff --git a/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml
index 22c4af4..f518e56 100644
--- a/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
name: netty
description: Netty Tcp or Udp Source Connector
sourceClass: org.apache.pulsar.io.netty.NettySource
+sourceConfigClass: org.apache.pulsar.io.netty.NettySourceConfig
diff --git a/pulsar-io/rabbitmq/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/rabbitmq/src/main/resources/META-INF/services/pulsar-io.yaml
index be65345..d0a4d1e 100644
--- a/pulsar-io/rabbitmq/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/rabbitmq/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,4 +20,6 @@
name: rabbitmq
description: RabbitMQ source and sink connector
sourceClass: org.apache.pulsar.io.rabbitmq.RabbitMQSource
-sinkClass: org.apache.pulsar.io.rabbitmq.RabbitMQSink
\ No newline at end of file
+sinkClass: org.apache.pulsar.io.rabbitmq.RabbitMQSink
+sourceConfigClass: org.apache.pulsar.io.rabbitmq.RabbitMQSourceConfig
+sinkConfigClass: org.apache.pulsar.io.rabbitmq.RabbitMQSinkConfig
\ No newline at end of file
diff --git a/pulsar-io/redis/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/redis/src/main/resources/META-INF/services/pulsar-io.yaml
index 39a8629..051ec40 100644
--- a/pulsar-io/redis/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/redis/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -19,3 +19,4 @@
name: redis
description: Writes data into Redis
sinkClass: org.apache.pulsar.io.redis.sink.RedisSink
+sinkConfigClass: org.apache.pulsar.io.redis.sink.RedisSinkConfig
diff --git a/pulsar-io/solr/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/solr/src/main/resources/META-INF/services/pulsar-io.yaml
index fdf223a..26347f5 100644
--- a/pulsar-io/solr/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/solr/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
name: solr
description: Writes data into solr collection
sinkClass: org.apache.pulsar.io.solr.SolrGenericRecordSink
+sinkConfigClass: org.apache.pulsar.io.solr.SolrSinkConfig
diff --git a/pulsar-io/twitter/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/twitter/src/main/resources/META-INF/services/pulsar-io.yaml
index cc814be..39d7d97 100644
--- a/pulsar-io/twitter/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/twitter/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
name: twitter
description: Ingest data from Twitter firehose
sourceClass: org.apache.pulsar.io.twitter.TwitterFireHose
+sourceConfigClass: org.apache.pulsar.io.twitter.TwitterFireHoseConfig