You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/06/02 01:19:14 UTC

[pulsar] branch master updated: Add a new api to get information about config definition of builtin sources/sinks (#7114)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f0b6934  Add a new api to get information about config definition of builtin sources/sinks (#7114)
f0b6934 is described below

commit f0b6934dd563886bc2f7926fb3724784edfe74f2
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Mon Jun 1 18:19:01 2020 -0700

    Add a new api to get information about config definition of builtin sources/sinks (#7114)
    
    Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
 .../apache/pulsar/broker/admin/impl/SinksBase.java | 30 +++++++++----
 .../pulsar/broker/admin/impl/SourcesBase.java      | 30 +++++++++----
 .../pulsar/common/io/ConfigFieldDefinition.java    | 33 +++++++++-----
 .../org/apache/pulsar/common/util/Reflections.java | 15 +++++++
 .../pulsar/functions/utils/io/ConnectorUtils.java  | 46 +++++++++++++++++++-
 .../pulsar/functions/utils/io/Connectors.java      |  3 ++
 .../pulsar/functions/worker/ConnectorsManager.java |  9 ++++
 .../functions/worker/rest/api/SinksImpl.java       | 29 +++++++++++--
 .../functions/worker/rest/api/SourcesImpl.java     | 29 +++++++++++--
 .../worker/rest/api/v3/SinksApiV3Resource.java     | 50 ++++++++++++++++------
 .../worker/rest/api/v3/SourcesApiV3Resource.java   | 50 ++++++++++++++++------
 pulsar-io/common/pom.xml                           |  5 +++
 .../org/apache/pulsar/io/common/IOConfigUtils.java | 17 +-------
 13 files changed, 271 insertions(+), 75 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
index 61d3603..9f69e6c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
@@ -22,6 +22,7 @@ import io.swagger.annotations.*;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.io.ConfigFieldDefinition;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.policies.data.SinkStatus;
@@ -478,14 +479,27 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
     })
     @Path("/builtinsinks")
     public List<ConnectorDefinition> getSinkList() {
-        List<ConnectorDefinition> connectorDefinitions = sink.getListOfConnectors();
-        List<ConnectorDefinition> retval = new ArrayList<>();
-        for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
-            if (!StringUtils.isEmpty(connectorDefinition.getSinkClass())) {
-                retval.add(connectorDefinition);
-            }
-        }
-        return retval;
+        return sink.getSinkList();
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetches information about config fields associated with the specified builtin sink",
+            response = ConfigFieldDefinition.class,
+            responseContainer = "List"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 404, message = "builtin sink does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
+    })
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/builtinsinks/{name}/configdefinition")
+    public List<ConfigFieldDefinition> getSinkConfigDefinition(
+            @ApiParam(value = "The name of the builtin sink")
+            final @PathParam("name") String name) throws IOException {
+        return sink.getSinkConfigDefinition(name);
     }
 
     @POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
index aba9069..c25bcdd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
@@ -22,6 +22,7 @@ import io.swagger.annotations.*;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.io.ConfigFieldDefinition;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.policies.data.SourceStatus;
@@ -468,14 +469,27 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
     @Produces(MediaType.APPLICATION_JSON)
     @Path("/builtinsources")
     public List<ConnectorDefinition> getSourceList() {
-        List<ConnectorDefinition> connectorDefinitions = source.getListOfConnectors();
-        List<ConnectorDefinition> retval = new ArrayList<>();
-        for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
-            if (!StringUtils.isEmpty(connectorDefinition.getSourceClass())) {
-                retval.add(connectorDefinition);
-            }
-        }
-        return retval;
+        return source.getSourceList();
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetches information about config fields associated with the specified builtin source",
+            response = ConfigFieldDefinition.class,
+            responseContainer = "List"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 404, message = "builtin source does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
+    })
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/builtinsources/{name}/configdefinition")
+    public List<ConfigFieldDefinition> getSourceConfigDefinition(
+            @ApiParam(value = "The name of the builtin source")
+            final @PathParam("name") String name) throws IOException {
+        return source.getSourceConfigDefinition(name);
     }
 
     @POST
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connectors.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/ConfigFieldDefinition.java
similarity index 65%
copy from pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connectors.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/io/ConfigFieldDefinition.java
index a5d7e61..2d242a0 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connectors.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/ConfigFieldDefinition.java
@@ -16,21 +16,32 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.utils.io;
+package org.apache.pulsar.common.io;
 
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
 
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
-import org.apache.pulsar.common.io.ConnectorDefinition;
-
+/**
+ * Information about a Pulsar connector config field.
+ */
 @Data
-public class Connectors {
-    final List<ConnectorDefinition> connectors = new ArrayList<>();
-    final Map<String, Path> sources = new TreeMap<>();
-    final Map<String, Path> sinks = new TreeMap<>();
+@NoArgsConstructor
+public class ConfigFieldDefinition {
+
+    /**
+     * The name of the field.
+     */
+    private String fieldName;
+
+    /**
+     * The field type.
+     */
+    private String typeName;
+
+    /**
+     * Other attribute pairs associated with the field.
+     */
+    private Map<String, String> attributes;
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Reflections.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Reflections.java
index e1713ba..b0045d0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Reflections.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Reflections.java
@@ -21,9 +21,13 @@ package org.apache.pulsar.common.util;
 import java.io.IOException;
 import java.lang.reflect.Array;
 import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URLClassLoader;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -304,4 +308,15 @@ public class Reflections {
             }
         }
     }
+
+    public static List<Field> getAllFields(Class<?> type) {
+        List<Field> fields = new LinkedList<>();
+        fields.addAll(Arrays.asList(type.getDeclaredFields()));
+
+        if (type.getSuperclass() != null) {
+            fields.addAll(getAllFields(type.getSuperclass()));
+        }
+
+        return fields;
+    }
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
index e96afdd..fcddf5c 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
@@ -20,22 +20,28 @@ package org.apache.pulsar.functions.utils.io;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.Collections;
+import java.util.*;
 
 import lombok.experimental.UtilityClass;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.io.ConfigFieldDefinition;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.utils.Exceptions;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 @UtilityClass
 @Slf4j
@@ -108,6 +114,38 @@ public class ConnectorUtils {
         return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, ConnectorDefinition.class);
     }
 
+    public static List<ConfigFieldDefinition> getConnectorConfigDefinition(String narPath,
+                                                                           String configClassName,
+                                                                           String narExtractionDirectory) throws Exception {
+        List<ConfigFieldDefinition> retval = new LinkedList<>();
+        try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), narExtractionDirectory)) {
+            Class configClass = ncl.loadClass(configClassName);
+            for (Field field : Reflections.getAllFields(configClass)) {
+                if (java.lang.reflect.Modifier.isStatic(field.getModifiers())) {
+                    // We dont want static fields
+                    continue;
+                }
+                field.setAccessible(true);
+                ConfigFieldDefinition configFieldDefinition = new ConfigFieldDefinition();
+                configFieldDefinition.setFieldName(field.getName());
+                configFieldDefinition.setTypeName(field.getType().getName());
+                Map<String, String> attributes = new HashMap<>();
+                for (Annotation annotation : field.getAnnotations()) {
+                    if (annotation.annotationType().equals(FieldDoc.class)) {
+                        FieldDoc fieldDoc = (FieldDoc) annotation;
+                        for (Method method : FieldDoc.class.getDeclaredMethods()) {
+                            Object value = method.invoke(fieldDoc);
+                            attributes.put(method.getName(), value == null ? "" : value.toString());
+                        }
+                    }
+                }
+                configFieldDefinition.setAttributes(attributes);
+                retval.add(configFieldDefinition);
+            }
+        }
+        return retval;
+    }
+
     public static Connectors searchForConnectors(String connectorsDirectory, String narExtractionDirectory) throws IOException {
         Path path = Paths.get(connectorsDirectory).toAbsolutePath();
         log.info("Searching for connectors in {}", path);
@@ -127,10 +165,16 @@ public class ConnectorUtils {
 
                     if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
                         connectors.sources.put(cntDef.getName(), archive);
+                        if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
+                            connectors.sourceConfigDefinitions.put(cntDef.getName(), getConnectorConfigDefinition(archive.toString(), cntDef.getSourceConfigClass(), narExtractionDirectory));
+                        }
                     }
 
                     if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
                         connectors.sinks.put(cntDef.getName(), archive);
+                        if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
+                            connectors.sinkConfigDefinitions.put(cntDef.getName(), getConnectorConfigDefinition(archive.toString(), cntDef.getSinkConfigClass(), narExtractionDirectory));
+                        }
                     }
 
                     connectors.connectors.add(cntDef);
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connectors.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connectors.java
index a5d7e61..d23dabd 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connectors.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connectors.java
@@ -26,11 +26,14 @@ import java.util.TreeMap;
 
 import lombok.Data;
 
+import org.apache.pulsar.common.io.ConfigFieldDefinition;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 
 @Data
 public class Connectors {
     final List<ConnectorDefinition> connectors = new ArrayList<>();
     final Map<String, Path> sources = new TreeMap<>();
+    final Map<String, List<ConfigFieldDefinition>> sourceConfigDefinitions = new TreeMap<>();
     final Map<String, Path> sinks = new TreeMap<>();
+    final Map<String, List<ConfigFieldDefinition>> sinkConfigDefinitions = new TreeMap<>();
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
index 40156eb..ca61744 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.file.Path;
 import java.util.List;
 
+import org.apache.pulsar.common.io.ConfigFieldDefinition;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.utils.io.Connectors;
@@ -42,6 +43,14 @@ public class ConnectorsManager {
         return connectors.getSources().get(sourceType);
     }
 
+    public List<ConfigFieldDefinition> getSourceConfigDefinition(String sourceType) {
+        return connectors.getSourceConfigDefinitions().get(sourceType);
+    }
+
+    public List<ConfigFieldDefinition> getSinkConfigDefinition(String sinkType) {
+        return connectors.getSinkConfigDefinitions().get(sinkType);
+    }
+
     public Path getSinkArchive(String sinkType) {
         return connectors.getSinks().get(sinkType);
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index 2a0d3c3..07617f4 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -26,6 +26,8 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.functions.UpdateOptions;
 import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.io.ConfigFieldDefinition;
+import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SinkStatus;
@@ -49,10 +51,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.nio.file.Path;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
+import java.util.*;
 import java.util.function.Supplier;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
@@ -665,6 +664,28 @@ public class SinksImpl extends ComponentImpl {
         return config;
     }
 
+    public List<ConnectorDefinition> getSinkList() {
+        List<ConnectorDefinition> connectorDefinitions = getListOfConnectors();
+        List<ConnectorDefinition> retval = new ArrayList<>();
+        for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
+            if (!org.apache.commons.lang.StringUtils.isEmpty(connectorDefinition.getSinkClass())) {
+                retval.add(connectorDefinition);
+            }
+        }
+        return retval;
+    }
+
+    public List<ConfigFieldDefinition> getSinkConfigDefinition(String name) {
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+        List<ConfigFieldDefinition> retval = this.worker().getConnectorsManager().getSinkConfigDefinition(name);
+        if (retval == null) {
+            throw new RestException(Response.Status.NOT_FOUND, "builtin sink does not exist");
+        }
+        return retval;
+    }
+
     private Function.FunctionDetails validateUpdateRequestParams(final String tenant,
                                                                  final String namespace,
                                                                  final String sinkName,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index 97c22eb..659e817 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -26,6 +26,8 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.functions.UpdateOptions;
 import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.io.ConfigFieldDefinition;
+import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SourceStatus;
@@ -49,10 +51,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.nio.file.Path;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
+import java.util.*;
 import java.util.function.Supplier;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
@@ -661,6 +660,28 @@ public class SourcesImpl extends ComponentImpl {
         return config;
     }
 
+    public List<ConnectorDefinition> getSourceList() {
+        List<ConnectorDefinition> connectorDefinitions = getListOfConnectors();
+        List<ConnectorDefinition> retval = new ArrayList<>();
+        for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
+            if (!org.apache.commons.lang.StringUtils.isEmpty(connectorDefinition.getSourceClass())) {
+                retval.add(connectorDefinition);
+            }
+        }
+        return retval;
+    }
+
+    public List<ConfigFieldDefinition> getSourceConfigDefinition(String name) {
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+        List<ConfigFieldDefinition> retval = this.worker().getConnectorsManager().getSourceConfigDefinition(name);
+        if (retval == null) {
+            throw new RestException(Response.Status.NOT_FOUND, "builtin source does not exist");
+        }
+        return retval;
+    }
+
     private Function.FunctionDetails validateUpdateRequestParams(final String tenant,
                                                                  final String namespace,
                                                                  final String sourceName,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
index fe48afa..8ee1662 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
@@ -18,13 +18,11 @@
  */
 package org.apache.pulsar.functions.worker.rest.api.v3;
 
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.*;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.io.ConfigFieldDefinition;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.policies.data.SinkStatus;
@@ -240,13 +238,41 @@ public class SinksApiV3Resource extends FunctionApiResource {
     @GET
     @Path("/builtinsinks")
     public List<ConnectorDefinition> getSinkList() {
-        List<ConnectorDefinition> connectorDefinitions = sink.getListOfConnectors();
-        List<ConnectorDefinition> retVal = new ArrayList<>();
-        for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
-            if (!StringUtils.isEmpty(connectorDefinition.getSinkClass())) {
-                retVal.add(connectorDefinition);
-            }
-        }
-        return retVal;
+        return sink.getSinkList();
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetches information about config fields associated with the specified builtin sink",
+            response = ConfigFieldDefinition.class,
+            responseContainer = "List"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 404, message = "builtin sink does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
+    })
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/builtinsinks/{name}/configdefinition")
+    public List<ConfigFieldDefinition> getSinkConfigDefinition(
+            @ApiParam(value = "The name of the builtin sink")
+            final @PathParam("name") String name) throws IOException {
+        return sink.getSinkConfigDefinition(name);
+    }
+
+    @POST
+    @ApiOperation(
+            value = "Reload the built-in connectors, including Sources and Sinks",
+            response = Void.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 401, message = "This operation requires super-user access"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later."),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @Path("/reloadBuiltInSinks")
+    public void reloadSinks() {
+        sink.reloadConnectors(clientAppId());
     }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
index 5dcd18c..6b3daa9 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
@@ -18,13 +18,11 @@
  */
 package org.apache.pulsar.functions.worker.rest.api.v3;
 
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.*;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.io.ConfigFieldDefinition;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.policies.data.SourceStatus;
@@ -256,13 +254,41 @@ public class SourcesApiV3Resource extends FunctionApiResource {
     @Produces(MediaType.APPLICATION_JSON)
     @Path("/builtinsources")
     public List<ConnectorDefinition> getSourceList() {
-        List<ConnectorDefinition> connectorDefinitions = source.getListOfConnectors();
-        List<ConnectorDefinition> retval = new ArrayList<>();
-        for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
-            if (!StringUtils.isEmpty(connectorDefinition.getSourceClass())) {
-                retval.add(connectorDefinition);
-            }
-        }
-        return retval;
+        return source.getSourceList();
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetches information about config fields associated with the specified builtin source",
+            response = ConfigFieldDefinition.class,
+            responseContainer = "List"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 404, message = "builtin source does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
+    })
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/builtinsources/{name}/configdefinition")
+    public List<ConfigFieldDefinition> getSourceConfigDefinition(
+            @ApiParam(value = "The name of the builtin source")
+            final @PathParam("name") String name) throws IOException {
+        return source.getSourceConfigDefinition(name);
+    }
+
+    @POST
+    @ApiOperation(
+            value = "Reload the built-in connectors, including Sources and Sinks",
+            response = Void.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 401, message = "This operation requires super-user access"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later."),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @Path("/reloadBuiltInSources")
+    public void reloadSources() {
+        source.reloadConnectors(clientAppId());
     }
 }
diff --git a/pulsar-io/common/pom.xml b/pulsar-io/common/pom.xml
index e48d4d9..8fb2129 100644
--- a/pulsar-io/common/pom.xml
+++ b/pulsar-io/common/pom.xml
@@ -40,6 +40,11 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
         </dependency>
diff --git a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
index 8066ea6..efb72e8 100644
--- a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
+++ b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
@@ -20,16 +20,14 @@ package org.apache.pulsar.io.common;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Field;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 
@@ -44,21 +42,10 @@ public class IOConfigUtils {
     }
 
 
-    public static List<Field> getAllFields(Class<?> type) {
-        List<Field> fields = new LinkedList<>();
-        fields.addAll(Arrays.asList(type.getDeclaredFields()));
-
-        if (type.getSuperclass() != null) {
-            fields.addAll(getAllFields(type.getSuperclass()));
-        }
-
-        return fields;
-    }
-
     private static <T> T loadWithSecrets(Map<String, Object> map, Class<T> clazz, Function<String, String> secretsGetter) {
         Map<String, Object> configs = new HashMap<>(map);
 
-        for (Field field : getAllFields(clazz)) {
+        for (Field field : Reflections.getAllFields(clazz)) {
             field.setAccessible(true);
             for (Annotation annotation : field.getAnnotations()) {
                 if (annotation.annotationType().equals(FieldDoc.class)) {