You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/05/22 15:24:30 UTC

[pulsar] branch master updated: Added ability to add annotations to Connector Configs (#6983)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c8170b7  Added ability to add annotations to Connector Configs  (#6983)
c8170b7 is described below

commit c8170b7c9f0232c9a005b65b6b146e0417ec3190
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri May 22 08:24:13 2020 -0700

    Added ability to add annotations to Connector Configs  (#6983)
    
    * Added sourceConfigClass and sinkConfigClass
    
    * Add Validator annotation helpers to validate class parameters
    
    * Fix build errors
    
    * Take feedback into account
    
    * Connected with validation
    
    * Fix bugs
    
    * Added tests
    
    * Fix class name
    
    * Address feedback
    
    Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
 .../pulsar/common/io/ConnectorDefinition.java      | 22 ++++++++++
 .../org/apache/pulsar/functions/LocalRunner.java   |  8 ++--
 .../pulsar/functions/worker/WorkerConfig.java      |  5 +++
 .../pulsar/functions/utils/SinkConfigUtils.java    | 37 ++++++++++++++++-
 .../pulsar/functions/utils/SourceConfigUtils.java  | 34 ++++++++++++++-
 .../pulsar/functions/utils/io/ConnectorUtils.java  | 16 ++++----
 .../functions/utils/SinkConfigUtilsTest.java       | 47 ++++++++++++++++++++-
 .../functions/utils/SourceConfigUtilsTest.java     | 48 +++++++++++++++++++++-
 .../functions/worker/rest/api/SinksImpl.java       |  3 +-
 .../functions/worker/rest/api/SourcesImpl.java     |  3 +-
 .../resources/META-INF/services/pulsar-io.yaml     |  1 +
 .../resources/META-INF/services/pulsar-io.yaml     |  1 +
 .../resources/META-INF/services/pulsar-io.yaml     |  1 +
 .../resources/META-INF/services/pulsar-io.yaml     |  1 +
 .../resources/META-INF/services/pulsar-io.yaml     |  1 +
 .../resources/META-INF/services/pulsar-io.yaml     |  3 +-
 .../resources/META-INF/services/pulsar-io.yaml     |  1 +
 .../resources/META-INF/services/pulsar-io.yaml     |  1 +
 .../resources/META-INF/services/pulsar-io.yaml     |  1 +
 .../resources/META-INF/services/pulsar-io.yaml     |  1 +
 .../resources/META-INF/services/pulsar-io.yaml     |  1 +
 .../resources/META-INF/services/pulsar-io.yaml     |  2 +
 .../resources/META-INF/services/pulsar-io.yaml     |  2 +
 .../resources/META-INF/services/pulsar-io.yaml     |  2 +
 .../resources/META-INF/services/pulsar-io.yaml     |  1 +
 .../resources/META-INF/services/pulsar-io.yaml     |  4 +-
 .../resources/META-INF/services/pulsar-io.yaml     |  1 +
 .../resources/META-INF/services/pulsar-io.yaml     |  1 +
 .../resources/META-INF/services/pulsar-io.yaml     |  1 +
 29 files changed, 229 insertions(+), 21 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/ConnectorDefinition.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/ConnectorDefinition.java
index d1bb334..99c3738 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/ConnectorDefinition.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/ConnectorDefinition.java
@@ -51,4 +51,26 @@ public class ConnectorDefinition {
      * <p>If not defined, it will be assumed this connector cannot act as a data sink.
      */
     private String sinkClass;
+
+    /**
+     * The class name for the source config implementation.
+     * Most of the sources are using a config class for managing their config
+     * and directly convert the supplied Map object at open to this object.
+     * These connector can declare their config class in this variable that will allow
+     * the framework to check for config parameter checking at submission time.
+     *
+     * <p>If not defined, the framework will not be able to do any submission time checks.
+     */
+    private String sourceConfigClass;
+
+    /**
+     * The class name for the sink config implementation.
+     * Most of the sink are using a config class for managing their config
+     * and directly convert the supplied Map object at open to this object.
+     * These connector can declare their config class in this variable that will allow
+     * the framework to check for config parameter checking at submission time.
+     *
+     * <p>If not defined, the framework will not be able to do any submission time checks.
+     */
+    private String sinkConfigClass;
 }
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 4f63bd2..3170e6e 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -257,14 +257,14 @@ public class LocalRunner {
 
                 if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
                     File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
-                    functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file, narExtractionDirectory));
+                    functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file, narExtractionDirectory, true));
 
                 } else {
                     File file = new File(userCodeFile);
                     if (!file.exists()) {
                         throw new RuntimeException("Source archive (" + userCodeFile + ") does not exist");
                     }
-                    functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file, narExtractionDirectory));
+                    functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file, narExtractionDirectory, true));
                 }
             } else if (sinkConfig != null) {
                 inferMissingArguments(sinkConfig);
@@ -285,13 +285,13 @@ public class LocalRunner {
 
                 if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
                     File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
-                    functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file, narExtractionDirectory));
+                    functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file, narExtractionDirectory, true));
                 } else {
                     File file = new File(userCodeFile);
                     if (!file.exists()) {
                         throw new RuntimeException("Sink archive does not exist");
                     }
-                    functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file, narExtractionDirectory));
+                    functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file, narExtractionDirectory,  true));
                 }
             } else {
                 throw new IllegalArgumentException("Must specify Function, Source or Sink config");
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 4fd21f0..b1cf7cb 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -141,6 +141,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
     )
     private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
     @FieldContext(
+            category = CATEGORY_CONNECTORS,
+            doc = "Should we validate connector config during submission"
+    )
+    private Boolean validateConnectorConfig = false;
+    @FieldContext(
         category = CATEGORY_FUNC_METADATA_MNG,
         doc = "The pulsar topic used for storing function metadata"
     )
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 57aa8ec..7bb9f77 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -20,6 +20,7 @@
 package org.apache.pulsar.functions.utils;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import lombok.AllArgsConstructor;
@@ -31,9 +32,13 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
+import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.validator.ConfigValidation;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -302,7 +307,8 @@ public class SinkConfigUtils {
     }
 
     public static ExtractedSinkDetails validate(SinkConfig sinkConfig, Path archivePath,
-                                          File sinkPackageFile, String narExtractionDirectory) {
+                                                File sinkPackageFile, String narExtractionDirectory,
+                                                boolean validateConnectorConfig) {
         if (isEmpty(sinkConfig.getTenant())) {
             throw new IllegalArgumentException("Sink tenant cannot be null");
         }
@@ -374,6 +380,9 @@ public class SinkConfigUtils {
             } catch (IOException e) {
                 throw new IllegalArgumentException("Failed to extract Sink class from archive", e);
             }
+            if (validateConnectorConfig) {
+                validateConnectorConfig(sinkConfig, (NarClassLoader)  narClassLoader);
+            }
             try {
                 typeArg = getSinkType(sinkClassName, narClassLoader);
                 classLoader = narClassLoader;
@@ -398,12 +407,18 @@ public class SinkConfigUtils {
                             throw new IllegalArgumentException(
                                     String.format("Sink class %s must be in class path", sinkClassName), e1);
                         }
+                        if (validateConnectorConfig) {
+                            validateConnectorConfig(sinkConfig, (NarClassLoader)  narClassLoader);
+                        }
                     } else {
                         throw new IllegalArgumentException(
                                 String.format("Sink class %s must be in class path", sinkClassName), e);
                     }
                 }
             } else if (narClassLoader != null) {
+                if (validateConnectorConfig) {
+                    validateConnectorConfig(sinkConfig, (NarClassLoader)  narClassLoader);
+                }
                 try {
                     typeArg = getSinkType(sinkClassName, narClassLoader);
                     classLoader = narClassLoader;
@@ -580,4 +595,24 @@ public class SinkConfigUtils {
         }
         return mergedConfig;
     }
+
+    public static void validateConnectorConfig(SinkConfig sinkConfig, ClassLoader classLoader) {
+        try {
+            ConnectorDefinition defn = ConnectorUtils.getConnectorDefinition(classLoader);
+            if (defn.getSinkConfigClass() != null) {
+                Class configClass = Class.forName(defn.getSinkConfigClass(),  true, classLoader);
+                Object configObject =
+                        ObjectMapperFactory.getThreadLocal().convertValue(sinkConfig.getConfigs(), configClass);
+                if (configObject != null) {
+                    ConfigValidation.validateConfig(configObject);
+                }
+            }
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Error validating sink config", e);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException("Could not find sink config class", e);
+        } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException("Could not validate sink config: " + e.getMessage());
+        }
+    }
 }
\ No newline at end of file
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index b184ef0..38939a6 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -20,6 +20,7 @@
 package org.apache.pulsar.functions.utils;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import lombok.AllArgsConstructor;
@@ -28,10 +29,12 @@ import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.functions.Resources;
+import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.validator.ConfigValidation;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -209,7 +212,8 @@ public class SourceConfigUtils {
     }
 
     public static ExtractedSourceDetails validate(SourceConfig sourceConfig, Path archivePath,
-                                                  File sourcePackageFile, String narExtractionDirectory) {
+                                                  File sourcePackageFile, String narExtractionDirectory,
+                                                  boolean validateConnectorConfig) {
         if (isEmpty(sourceConfig.getTenant())) {
             throw new IllegalArgumentException("Source tenant cannot be null");
         }
@@ -268,6 +272,9 @@ public class SourceConfigUtils {
             } catch (IOException e) {
                 throw new IllegalArgumentException("Failed to extract source class from archive", e);
             }
+            if (validateConnectorConfig) {
+                validateConnectorConfig(sourceConfig, (NarClassLoader)  narClassLoader);
+            }
             try {
                 typeArg = getSourceType(sourceClassName, narClassLoader);
                 classLoader = narClassLoader;
@@ -292,12 +299,18 @@ public class SourceConfigUtils {
                             throw new IllegalArgumentException(
                                     String.format("Source class %s must be in class path", sourceClassName), e1);
                         }
+                        if (validateConnectorConfig) {
+                            validateConnectorConfig(sourceConfig, (NarClassLoader)  narClassLoader);
+                        }
                     } else {
                         throw new IllegalArgumentException(
                                 String.format("Source class %s must be in class path", sourceClassName), e);
                     }
                 }
             } else if (narClassLoader != null) {
+                if (validateConnectorConfig) {
+                    validateConnectorConfig(sourceConfig, (NarClassLoader)  narClassLoader);
+                }
                 try {
                     typeArg = getSourceType(sourceClassName, narClassLoader);
                     classLoader = narClassLoader;
@@ -386,4 +399,23 @@ public class SourceConfigUtils {
         return mergedConfig;
     }
 
+    public static void validateConnectorConfig(SourceConfig sourceConfig, ClassLoader classLoader) {
+        try {
+            ConnectorDefinition defn = ConnectorUtils.getConnectorDefinition(classLoader);
+            if (defn.getSourceConfigClass() != null) {
+                Class configClass = Class.forName(defn.getSourceConfigClass(), true, classLoader);
+                Object configObject = ObjectMapperFactory.getThreadLocal().convertValue(sourceConfig.getConfigs(), configClass);
+                if (configObject != null) {
+                    ConfigValidation.validateConfig(configObject);
+                }
+            }
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Error validating source config", e);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException("Could not find source config class");
+        } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException("Could not validate source config: " + e.getMessage());
+        }
+    }
+
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
index a78c24c..e96afdd 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
@@ -74,11 +74,8 @@ public class ConnectorUtils {
      * Extract the Pulsar IO Sink class from a connector archive.
      */
     public static String getIOSinkClass(ClassLoader classLoader) throws IOException {
+        ConnectorDefinition conf = getConnectorDefinition(classLoader);
         NarClassLoader ncl = (NarClassLoader) classLoader;
-        String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
-
-        ConnectorDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
-                ConnectorDefinition.class);
         if (StringUtils.isEmpty(conf.getSinkClass())) {
             throw new IOException(
                     String.format("The '%s' connector does not provide a sink implementation", conf.getName()));
@@ -100,12 +97,17 @@ public class ConnectorUtils {
 
     public static ConnectorDefinition getConnectorDefinition(String narPath, String narExtractionDirectory) throws IOException {
         try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), narExtractionDirectory)) {
-            String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
-
-            return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, ConnectorDefinition.class);
+            return getConnectorDefinition(ncl);
         }
     }
 
+    public static ConnectorDefinition getConnectorDefinition(ClassLoader classLoader) throws IOException {
+        NarClassLoader narClassLoader = (NarClassLoader) classLoader;
+        String configStr = narClassLoader.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+
+        return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, ConnectorDefinition.class);
+    }
+
     public static Connectors searchForConnectors(String connectorsDirectory, String narExtractionDirectory) throws IOException {
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index 47515d6..8e9df03 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -19,12 +19,22 @@
 package org.apache.pulsar.functions.utils;
 
 import com.google.gson.Gson;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.Accessors;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
+import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.validator.ConfigValidationAnnotations;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.utils.io.ConnectorUtils;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.testng.PowerMockTestCase;
 import org.testng.annotations.Test;
 
 import java.io.IOException;
@@ -33,12 +43,26 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
-import static org.testng.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.testng.Assert.*;
 
 /**
  * Unit test of {@link Reflections}.
  */
-public class SinkConfigUtilsTest {
+@PrepareForTest(ConnectorUtils.class)
+@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "javax.xml.*", "org.xml.*", "org.w3c.dom.*", "org.springframework.context.*", "org.apache.log4j.*", "com.sun.org.apache.xerces.*", "javax.management.*" })
+public class SinkConfigUtilsTest extends PowerMockTestCase {
+
+    private ConnectorDefinition defn;
+
+    @Data
+    @Accessors(chain = true)
+    @NoArgsConstructor
+    public static class TestSinkConfig {
+        @ConfigValidationAnnotations.NotNull
+        private String configParameter;
+    }
 
     @Test
     public void testConvertBackFidelity() throws IOException  {
@@ -279,6 +303,25 @@ public class SinkConfigUtilsTest {
         );
     }
 
+    @Test
+    public void testValidateConfig() throws IOException {
+        mockStatic(ConnectorUtils.class);
+        defn = new ConnectorDefinition();
+        defn.setSinkConfigClass(TestSinkConfig.class.getName());
+        PowerMockito.when(ConnectorUtils.getConnectorDefinition(any())).thenReturn(defn);
+
+        SinkConfig sinkConfig = createSinkConfig();
+
+        // Good config
+        sinkConfig.getConfigs().put("configParameter", "Test");
+        SinkConfigUtils.validateConnectorConfig(sinkConfig, Thread.currentThread().getContextClassLoader());
+
+        // Bad config
+        sinkConfig.getConfigs().put("configParameter", null);
+        Exception e = expectThrows(IllegalArgumentException.class, () -> SinkConfigUtils.validateConnectorConfig(sinkConfig, Thread.currentThread().getContextClassLoader()));
+        assertTrue(e.getMessage().contains("Could not validate sink config: Field 'configParameter' cannot be null!"));
+    }
+
     private SinkConfig createSinkConfig() {
         SinkConfig sinkConfig = new SinkConfig();
         sinkConfig.setTenant("test-tenant");
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index 3c6ee73..4c80b23 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -19,11 +19,22 @@
 package org.apache.pulsar.functions.utils;
 
 import com.google.gson.Gson;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.Accessors;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.common.validator.ConfigValidationAnnotations;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.utils.io.ConnectorUtils;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.testng.PowerMockTestCase;
 import org.testng.annotations.Test;
 
 import java.io.IOException;
@@ -32,12 +43,26 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
-import static org.testng.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.testng.Assert.*;
 
 /**
  * Unit test of {@link Reflections}.
  */
-public class SourceConfigUtilsTest {
+@PrepareForTest(ConnectorUtils.class)
+@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "javax.xml.*", "org.xml.*", "org.w3c.dom.*", "org.springframework.context.*", "org.apache.log4j.*", "com.sun.org.apache.xerces.*", "javax.management.*" })
+public class SourceConfigUtilsTest extends PowerMockTestCase {
+
+    private ConnectorDefinition defn;
+
+    @Data
+    @Accessors(chain = true)
+    @NoArgsConstructor
+    public static class TestSourceConfig {
+        @ConfigValidationAnnotations.NotNull
+        private String configParameter;
+    }
 
     @Test
     public void testConvertBackFidelity() throws IOException  {
@@ -213,6 +238,25 @@ public class SourceConfigUtilsTest {
         );
     }
 
+    @Test
+    public void testValidateConfig() throws IOException {
+        mockStatic(ConnectorUtils.class);
+        defn = new ConnectorDefinition();
+        defn.setSourceConfigClass(SourceConfigUtilsTest.TestSourceConfig.class.getName());
+        PowerMockito.when(ConnectorUtils.getConnectorDefinition(any())).thenReturn(defn);
+
+        SourceConfig sourceConfig = createSourceConfig();
+
+        // Good config
+        sourceConfig.getConfigs().put("configParameter", "Test");
+        SourceConfigUtils.validateConnectorConfig(sourceConfig, Thread.currentThread().getContextClassLoader());
+
+        // Bad config
+        sourceConfig.getConfigs().put("configParameter", null);
+        Exception e = expectThrows(IllegalArgumentException.class, () -> SourceConfigUtils.validateConnectorConfig(sourceConfig, Thread.currentThread().getContextClassLoader()));
+        assertTrue(e.getMessage().contains("Could not validate source config: Field 'configParameter' cannot be null!"));
+    }
+
     private SourceConfig createSourceConfig() {
         SourceConfig sourceConfig = new SourceConfig();
         sourceConfig.setTenant("test-tenant");
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index 560c8cb..2a0d3c3 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -689,7 +689,8 @@ public class SinksImpl extends ComponentImpl {
             }
         }
         SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validate(sinkConfig, archivePath,
-                componentPackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
+                componentPackageFile, worker().getWorkerConfig().getNarExtractionDirectory(),
+                worker().getWorkerConfig().getValidateConnectorConfig());
         return SinkConfigUtils.convert(sinkConfig, sinkDetails);
     }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index 4f434fa..97c22eb 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -685,7 +685,8 @@ public class SourcesImpl extends ComponentImpl {
             }
         }
         SourceConfigUtils.ExtractedSourceDetails sourceDetails = SourceConfigUtils.validate(sourceConfig, archivePath,
-                sourcePackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
+                sourcePackageFile, worker().getWorkerConfig().getNarExtractionDirectory(),
+                worker().getWorkerConfig().getValidateConnectorConfig());
         return SourceConfigUtils.convert(sourceConfig, sourceDetails);
     }
 }
diff --git a/pulsar-io/aerospike/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/aerospike/src/main/resources/META-INF/services/pulsar-io.yaml
index f2a7ab5..d8f20ec 100644
--- a/pulsar-io/aerospike/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/aerospike/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
 name: aerospike
 description: Aerospike database sink
 sinkClass: org.apache.pulsar.io.aerospike.AerospikeStringSink
+sinkConfigClass: org.apache.pulsar.io.aerospike.AerospikeSinkConfig
diff --git a/pulsar-io/canal/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/canal/src/main/resources/META-INF/services/pulsar-io.yaml
index 2ecf4df..6ea58b0 100644
--- a/pulsar-io/canal/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/canal/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
 name: canal
 description: canal source and read data from mysql
 sourceClass: org.apache.pulsar.io.canal.CanalStringSource
+sourceConfigClass: org.apache.pulsar.io.canal.CanalSourceConfig
diff --git a/pulsar-io/cassandra/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/cassandra/src/main/resources/META-INF/services/pulsar-io.yaml
index 0ceb7442..b486321 100644
--- a/pulsar-io/cassandra/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/cassandra/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
 name: cassandra
 description: Writes data into Cassandra
 sinkClass: org.apache.pulsar.io.cassandra.CassandraStringSink
+sinkConfigClass: org.apache.pulsar.io.cassandra.CassandraSinkConfig
diff --git a/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml
index 97789e9..0603290 100644
--- a/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
 name: elastic_search
 description: Writes data into Elastic Search
 sinkClass: org.apache.pulsar.io.elasticsearch.ElasticSearchSink
+sinkConfigClass: org.apache.pulsar.io.elasticsearch.ElasticSearchConfig
diff --git a/pulsar-io/file/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/file/src/main/resources/META-INF/services/pulsar-io.yaml
index df455c2..4651054 100644
--- a/pulsar-io/file/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/file/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
 name: file
 description: Reads data from local filesystem
 sourceClass: org.apache.pulsar.io.file.FileSource
+sourceConfigClass: org.apache.pulsar.io.file.FileSourceConfig
diff --git a/pulsar-io/flume/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/flume/src/main/resources/META-INF/services/pulsar-io.yaml
index 0e578b8..580e528 100644
--- a/pulsar-io/flume/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/flume/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -19,4 +19,5 @@
 name: flume
 description: flume source and sink connector
 sourceClass: org.apache.pulsar.io.flume.source.StringSource
-sinkClass: org.apache.pulsar.io.flume.sink.StringSink
\ No newline at end of file
+sinkClass: org.apache.pulsar.io.flume.sink.StringSink
+sinkConfigClass: org.apache.pulsar.io.flume.FlumeConfig
diff --git a/pulsar-io/hbase/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/hbase/src/main/resources/META-INF/services/pulsar-io.yaml
index e0082d2..b2dff63 100644
--- a/pulsar-io/hbase/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/hbase/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
 name: hbase
 description: Writes data into hbase table
 sinkClass: org.apache.pulsar.io.hbase.sink.HbaseGenericRecordSink
+sinkConfigClass: org.apache.pulsar.io.hbase.sink.HbaseSinkConfig
diff --git a/pulsar-io/hdfs2/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/hdfs2/src/main/resources/META-INF/services/pulsar-io.yaml
index 92dd642..8bc3395 100644
--- a/pulsar-io/hdfs2/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/hdfs2/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
 name: hdfs2
 description: Writes data into HDFS 2.x
 sinkClass: org.apache.pulsar.io.hdfs2.sink.text.HdfsStringSink
+sinkConfigClass: org.apache.pulsar.io.hdfs2.sink.HdfsSinkConfig
diff --git a/pulsar-io/hdfs3/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/hdfs3/src/main/resources/META-INF/services/pulsar-io.yaml
index bb672b3..58c7109 100644
--- a/pulsar-io/hdfs3/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/hdfs3/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
 name: hdfs3
 description: Writes data into HDFS 3.x
 sinkClass: org.apache.pulsar.io.hdfs3.sink.text.HdfsStringSink
+sinkConfigClass: org.apache.pulsar.io.hdfs3.sink.HdfsSinkConfig
diff --git a/pulsar-io/influxdb/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/influxdb/src/main/resources/META-INF/services/pulsar-io.yaml
index d97451d..28e046d 100644
--- a/pulsar-io/influxdb/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/influxdb/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
 name: influxdb
 description: Writes data into InfluxDB database
 sinkClass: org.apache.pulsar.io.influxdb.InfluxDBGenericRecordSink
+sinkConfigClass: org.apache.pulsar.io.influxdb.v2.InfluxDBSinkConfig
diff --git a/pulsar-io/jdbc/sqlite/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/jdbc/sqlite/src/main/resources/META-INF/services/pulsar-io.yaml
index a1b9afd..8cd2e12 100644
--- a/pulsar-io/jdbc/sqlite/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/jdbc/sqlite/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
 name: jdbc-sqlite
 description: JDBC sink for SQLite
 sinkClass: org.apache.pulsar.io.jdbc.SqliteJdbcAutoSchemaSink
+sinkConfigClass: org.apache.pulsar.io.jdbc.JdbcSinkConfig
diff --git a/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml
index c3fd86d..b718587 100644
--- a/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -21,3 +21,5 @@ name: kafka
 description: Kafka source and sink connector
 sourceClass: org.apache.pulsar.io.kafka.KafkaBytesSource
 sinkClass: org.apache.pulsar.io.kafka.KafkaBytesSink
+sourceConfigClass: org.apache.pulsar.io.kafka.KafkaSourceConfig
+sinkConfigClass: org.apache.pulsar.io.kafka.KafkaSinkConfig
diff --git a/pulsar-io/kinesis/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/kinesis/src/main/resources/META-INF/services/pulsar-io.yaml
index 3114f4c..7a1e3b4 100644
--- a/pulsar-io/kinesis/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/kinesis/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -21,3 +21,5 @@ name: kinesis
 description: Kinesis connectors
 sinkClass: org.apache.pulsar.io.kinesis.KinesisSink
 sourceClass: org.apache.pulsar.io.kinesis.KinesisSource
+sourceConfigClass: org.apache.pulsar.io.kinesis.KinesisSourceConfig
+sinkConfigClass: org.apache.pulsar.io.kinesis.KinesisSinkConfig
diff --git a/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml
index bc32292..4fab476 100644
--- a/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,5 @@ name: mongo
 description: MongoDB source and sink connector
 sinkClass: org.apache.pulsar.io.mongodb.MongoSink
 sourceClass: org.apache.pulsar.io.mongodb.MongoSource
+sourceConfigClass: org.apache.pulsar.io.mongodb.MongoConfig
+sinkConfigClass: org.apache.pulsar.io.mongodb.MongoConfig
diff --git a/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml
index 22c4af4..f518e56 100644
--- a/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
 name: netty
 description: Netty Tcp or Udp Source Connector
 sourceClass: org.apache.pulsar.io.netty.NettySource
+sourceConfigClass: org.apache.pulsar.io.netty.NettySourceConfig
diff --git a/pulsar-io/rabbitmq/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/rabbitmq/src/main/resources/META-INF/services/pulsar-io.yaml
index be65345..d0a4d1e 100644
--- a/pulsar-io/rabbitmq/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/rabbitmq/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,4 +20,6 @@
 name: rabbitmq
 description: RabbitMQ source and sink connector
 sourceClass: org.apache.pulsar.io.rabbitmq.RabbitMQSource
-sinkClass: org.apache.pulsar.io.rabbitmq.RabbitMQSink
\ No newline at end of file
+sinkClass: org.apache.pulsar.io.rabbitmq.RabbitMQSink
+sourceConfigClass: org.apache.pulsar.io.rabbitmq.RabbitMQSourceConfig
+sinkConfigClass: org.apache.pulsar.io.rabbitmq.RabbitMQSinkConfig
\ No newline at end of file
diff --git a/pulsar-io/redis/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/redis/src/main/resources/META-INF/services/pulsar-io.yaml
index 39a8629..051ec40 100644
--- a/pulsar-io/redis/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/redis/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -19,3 +19,4 @@
 name: redis
 description: Writes data into Redis
 sinkClass: org.apache.pulsar.io.redis.sink.RedisSink
+sinkConfigClass: org.apache.pulsar.io.redis.sink.RedisSinkConfig
diff --git a/pulsar-io/solr/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/solr/src/main/resources/META-INF/services/pulsar-io.yaml
index fdf223a..26347f5 100644
--- a/pulsar-io/solr/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/solr/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
 name: solr
 description: Writes data into solr collection
 sinkClass: org.apache.pulsar.io.solr.SolrGenericRecordSink
+sinkConfigClass: org.apache.pulsar.io.solr.SolrSinkConfig
diff --git a/pulsar-io/twitter/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/twitter/src/main/resources/META-INF/services/pulsar-io.yaml
index cc814be..39d7d97 100644
--- a/pulsar-io/twitter/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/twitter/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
 name: twitter
 description: Ingest data from Twitter firehose
 sourceClass: org.apache.pulsar.io.twitter.TwitterFireHose
+sourceConfigClass: org.apache.pulsar.io.twitter.TwitterFireHoseConfig