You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/06/02 01:19:14 UTC
[pulsar] branch master updated: Add a new api to get information
about config definition of builtin sources/sinks (#7114)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f0b6934 Add a new api to get information about config definition of builtin sources/sinks (#7114)
f0b6934 is described below
commit f0b6934dd563886bc2f7926fb3724784edfe74f2
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Mon Jun 1 18:19:01 2020 -0700
Add a new api to get information about config definition of builtin sources/sinks (#7114)
Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
.../apache/pulsar/broker/admin/impl/SinksBase.java | 30 +++++++++----
.../pulsar/broker/admin/impl/SourcesBase.java | 30 +++++++++----
.../pulsar/common/io/ConfigFieldDefinition.java | 33 +++++++++-----
.../org/apache/pulsar/common/util/Reflections.java | 15 +++++++
.../pulsar/functions/utils/io/ConnectorUtils.java | 46 +++++++++++++++++++-
.../pulsar/functions/utils/io/Connectors.java | 3 ++
.../pulsar/functions/worker/ConnectorsManager.java | 9 ++++
.../functions/worker/rest/api/SinksImpl.java | 29 +++++++++++--
.../functions/worker/rest/api/SourcesImpl.java | 29 +++++++++++--
.../worker/rest/api/v3/SinksApiV3Resource.java | 50 ++++++++++++++++------
.../worker/rest/api/v3/SourcesApiV3Resource.java | 50 ++++++++++++++++------
pulsar-io/common/pom.xml | 5 +++
.../org/apache/pulsar/io/common/IOConfigUtils.java | 17 +-------
13 files changed, 271 insertions(+), 75 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
index 61d3603..9f69e6c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
@@ -22,6 +22,7 @@ import io.swagger.annotations.*;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.io.ConfigFieldDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.policies.data.SinkStatus;
@@ -478,14 +479,27 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
})
@Path("/builtinsinks")
public List<ConnectorDefinition> getSinkList() {
- List<ConnectorDefinition> connectorDefinitions = sink.getListOfConnectors();
- List<ConnectorDefinition> retval = new ArrayList<>();
- for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
- if (!StringUtils.isEmpty(connectorDefinition.getSinkClass())) {
- retval.add(connectorDefinition);
- }
- }
- return retval;
+ return sink.getSinkList();
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Fetches information about config fields associated with the specified builtin sink",
+ response = ConfigFieldDefinition.class,
+ responseContainer = "List"
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 404, message = "builtin sink does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error"),
+ @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
+ })
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/builtinsinks/{name}/configdefinition")
+ public List<ConfigFieldDefinition> getSinkConfigDefinition(
+ @ApiParam(value = "The name of the builtin sink")
+ final @PathParam("name") String name) throws IOException {
+ return sink.getSinkConfigDefinition(name);
}
@POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
index aba9069..c25bcdd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
@@ -22,6 +22,7 @@ import io.swagger.annotations.*;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.io.ConfigFieldDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.policies.data.SourceStatus;
@@ -468,14 +469,27 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
@Produces(MediaType.APPLICATION_JSON)
@Path("/builtinsources")
public List<ConnectorDefinition> getSourceList() {
- List<ConnectorDefinition> connectorDefinitions = source.getListOfConnectors();
- List<ConnectorDefinition> retval = new ArrayList<>();
- for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
- if (!StringUtils.isEmpty(connectorDefinition.getSourceClass())) {
- retval.add(connectorDefinition);
- }
- }
- return retval;
+ return source.getSourceList();
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Fetches information about config fields associated with the specified builtin source",
+ response = ConfigFieldDefinition.class,
+ responseContainer = "List"
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 404, message = "builtin source does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error"),
+ @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
+ })
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/builtinsources/{name}/configdefinition")
+ public List<ConfigFieldDefinition> getSourceConfigDefinition(
+ @ApiParam(value = "The name of the builtin source")
+ final @PathParam("name") String name) throws IOException {
+ return source.getSourceConfigDefinition(name);
}
@POST
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connectors.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/ConfigFieldDefinition.java
similarity index 65%
copy from pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connectors.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/io/ConfigFieldDefinition.java
index a5d7e61..2d242a0 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connectors.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/ConfigFieldDefinition.java
@@ -16,21 +16,32 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.utils.io;
+package org.apache.pulsar.common.io;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
-import java.util.TreeMap;
import lombok.Data;
+import lombok.NoArgsConstructor;
-import org.apache.pulsar.common.io.ConnectorDefinition;
-
+/**
+ * Information about a Pulsar connector config field.
+ */
@Data
-public class Connectors {
- final List<ConnectorDefinition> connectors = new ArrayList<>();
- final Map<String, Path> sources = new TreeMap<>();
- final Map<String, Path> sinks = new TreeMap<>();
+@NoArgsConstructor
+public class ConfigFieldDefinition {
+
+ /**
+ * The name of the field.
+ */
+ private String fieldName;
+
+ /**
+ * The field type.
+ */
+ private String typeName;
+
+ /**
+ * Other attribute pairs associated with the field.
+ */
+ private Map<String, String> attributes;
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Reflections.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Reflections.java
index e1713ba..b0045d0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Reflections.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Reflections.java
@@ -21,9 +21,13 @@ package org.apache.pulsar.common.util;
import java.io.IOException;
import java.lang.reflect.Array;
import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.net.URLClassLoader;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -304,4 +308,15 @@ public class Reflections {
}
}
}
+
+ public static List<Field> getAllFields(Class<?> type) {
+ List<Field> fields = new LinkedList<>();
+ fields.addAll(Arrays.asList(type.getDeclaredFields()));
+
+ if (type.getSuperclass() != null) {
+ fields.addAll(getAllFields(type.getSuperclass()));
+ }
+
+ return fields;
+ }
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
index e96afdd..fcddf5c 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
@@ -20,22 +20,28 @@ package org.apache.pulsar.functions.utils.io;
import java.io.File;
import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.Collections;
+import java.util.*;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.io.ConfigFieldDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.utils.Exceptions;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
@UtilityClass
@Slf4j
@@ -108,6 +114,38 @@ public class ConnectorUtils {
return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, ConnectorDefinition.class);
}
+ public static List<ConfigFieldDefinition> getConnectorConfigDefinition(String narPath,
+ String configClassName,
+ String narExtractionDirectory) throws Exception {
+ List<ConfigFieldDefinition> retval = new LinkedList<>();
+ try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), narExtractionDirectory)) {
+ Class configClass = ncl.loadClass(configClassName);
+ for (Field field : Reflections.getAllFields(configClass)) {
+ if (java.lang.reflect.Modifier.isStatic(field.getModifiers())) {
+ // We dont want static fields
+ continue;
+ }
+ field.setAccessible(true);
+ ConfigFieldDefinition configFieldDefinition = new ConfigFieldDefinition();
+ configFieldDefinition.setFieldName(field.getName());
+ configFieldDefinition.setTypeName(field.getType().getName());
+ Map<String, String> attributes = new HashMap<>();
+ for (Annotation annotation : field.getAnnotations()) {
+ if (annotation.annotationType().equals(FieldDoc.class)) {
+ FieldDoc fieldDoc = (FieldDoc) annotation;
+ for (Method method : FieldDoc.class.getDeclaredMethods()) {
+ Object value = method.invoke(fieldDoc);
+ attributes.put(method.getName(), value == null ? "" : value.toString());
+ }
+ }
+ }
+ configFieldDefinition.setAttributes(attributes);
+ retval.add(configFieldDefinition);
+ }
+ }
+ return retval;
+ }
+
public static Connectors searchForConnectors(String connectorsDirectory, String narExtractionDirectory) throws IOException {
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for connectors in {}", path);
@@ -127,10 +165,16 @@ public class ConnectorUtils {
if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
connectors.sources.put(cntDef.getName(), archive);
+ if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
+ connectors.sourceConfigDefinitions.put(cntDef.getName(), getConnectorConfigDefinition(archive.toString(), cntDef.getSourceConfigClass(), narExtractionDirectory));
+ }
}
if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
connectors.sinks.put(cntDef.getName(), archive);
+ if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
+ connectors.sinkConfigDefinitions.put(cntDef.getName(), getConnectorConfigDefinition(archive.toString(), cntDef.getSinkConfigClass(), narExtractionDirectory));
+ }
}
connectors.connectors.add(cntDef);
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connectors.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connectors.java
index a5d7e61..d23dabd 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connectors.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connectors.java
@@ -26,11 +26,14 @@ import java.util.TreeMap;
import lombok.Data;
+import org.apache.pulsar.common.io.ConfigFieldDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
@Data
public class Connectors {
final List<ConnectorDefinition> connectors = new ArrayList<>();
final Map<String, Path> sources = new TreeMap<>();
+ final Map<String, List<ConfigFieldDefinition>> sourceConfigDefinitions = new TreeMap<>();
final Map<String, Path> sinks = new TreeMap<>();
+ final Map<String, List<ConfigFieldDefinition>> sinkConfigDefinitions = new TreeMap<>();
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
index 40156eb..ca61744 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
+import org.apache.pulsar.common.io.ConfigFieldDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.utils.io.Connectors;
@@ -42,6 +43,14 @@ public class ConnectorsManager {
return connectors.getSources().get(sourceType);
}
+ public List<ConfigFieldDefinition> getSourceConfigDefinition(String sourceType) {
+ return connectors.getSourceConfigDefinitions().get(sourceType);
+ }
+
+ public List<ConfigFieldDefinition> getSinkConfigDefinition(String sinkType) {
+ return connectors.getSinkConfigDefinitions().get(sinkType);
+ }
+
public Path getSinkArchive(String sinkType) {
return connectors.getSinks().get(sinkType);
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index 2a0d3c3..07617f4 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -26,6 +26,8 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.io.ConfigFieldDefinition;
+import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.SinkStatus;
@@ -49,10 +51,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Path;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
+import java.util.*;
import java.util.function.Supplier;
import static org.apache.commons.lang3.StringUtils.isBlank;
@@ -665,6 +664,28 @@ public class SinksImpl extends ComponentImpl {
return config;
}
+ public List<ConnectorDefinition> getSinkList() {
+ List<ConnectorDefinition> connectorDefinitions = getListOfConnectors();
+ List<ConnectorDefinition> retval = new ArrayList<>();
+ for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
+ if (!org.apache.commons.lang.StringUtils.isEmpty(connectorDefinition.getSinkClass())) {
+ retval.add(connectorDefinition);
+ }
+ }
+ return retval;
+ }
+
+ public List<ConfigFieldDefinition> getSinkConfigDefinition(String name) {
+ if (!isWorkerServiceAvailable()) {
+ throwUnavailableException();
+ }
+ List<ConfigFieldDefinition> retval = this.worker().getConnectorsManager().getSinkConfigDefinition(name);
+ if (retval == null) {
+ throw new RestException(Response.Status.NOT_FOUND, "builtin sink does not exist");
+ }
+ return retval;
+ }
+
private Function.FunctionDetails validateUpdateRequestParams(final String tenant,
final String namespace,
final String sinkName,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index 97c22eb..659e817 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -26,6 +26,8 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.io.ConfigFieldDefinition;
+import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.SourceStatus;
@@ -49,10 +51,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Path;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
+import java.util.*;
import java.util.function.Supplier;
import static org.apache.commons.lang3.StringUtils.isBlank;
@@ -661,6 +660,28 @@ public class SourcesImpl extends ComponentImpl {
return config;
}
+ public List<ConnectorDefinition> getSourceList() {
+ List<ConnectorDefinition> connectorDefinitions = getListOfConnectors();
+ List<ConnectorDefinition> retval = new ArrayList<>();
+ for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
+ if (!org.apache.commons.lang.StringUtils.isEmpty(connectorDefinition.getSourceClass())) {
+ retval.add(connectorDefinition);
+ }
+ }
+ return retval;
+ }
+
+ public List<ConfigFieldDefinition> getSourceConfigDefinition(String name) {
+ if (!isWorkerServiceAvailable()) {
+ throwUnavailableException();
+ }
+ List<ConfigFieldDefinition> retval = this.worker().getConnectorsManager().getSourceConfigDefinition(name);
+ if (retval == null) {
+ throw new RestException(Response.Status.NOT_FOUND, "builtin source does not exist");
+ }
+ return retval;
+ }
+
private Function.FunctionDetails validateUpdateRequestParams(final String tenant,
final String namespace,
final String sourceName,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
index fe48afa..8ee1662 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
@@ -18,13 +18,11 @@
*/
package org.apache.pulsar.functions.worker.rest.api.v3;
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.io.ConfigFieldDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.policies.data.SinkStatus;
@@ -240,13 +238,41 @@ public class SinksApiV3Resource extends FunctionApiResource {
@GET
@Path("/builtinsinks")
public List<ConnectorDefinition> getSinkList() {
- List<ConnectorDefinition> connectorDefinitions = sink.getListOfConnectors();
- List<ConnectorDefinition> retVal = new ArrayList<>();
- for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
- if (!StringUtils.isEmpty(connectorDefinition.getSinkClass())) {
- retVal.add(connectorDefinition);
- }
- }
- return retVal;
+ return sink.getSinkList();
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Fetches information about config fields associated with the specified builtin sink",
+ response = ConfigFieldDefinition.class,
+ responseContainer = "List"
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 404, message = "builtin sink does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error"),
+ @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
+ })
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/builtinsinks/{name}/configdefinition")
+ public List<ConfigFieldDefinition> getSinkConfigDefinition(
+ @ApiParam(value = "The name of the builtin sink")
+ final @PathParam("name") String name) throws IOException {
+ return sink.getSinkConfigDefinition(name);
+ }
+
+ @POST
+ @ApiOperation(
+ value = "Reload the built-in connectors, including Sources and Sinks",
+ response = Void.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 401, message = "This operation requires super-user access"),
+ @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later."),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ @Path("/reloadBuiltInSinks")
+ public void reloadSinks() {
+ sink.reloadConnectors(clientAppId());
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
index 5dcd18c..6b3daa9 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
@@ -18,13 +18,11 @@
*/
package org.apache.pulsar.functions.worker.rest.api.v3;
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.io.ConfigFieldDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.policies.data.SourceStatus;
@@ -256,13 +254,41 @@ public class SourcesApiV3Resource extends FunctionApiResource {
@Produces(MediaType.APPLICATION_JSON)
@Path("/builtinsources")
public List<ConnectorDefinition> getSourceList() {
- List<ConnectorDefinition> connectorDefinitions = source.getListOfConnectors();
- List<ConnectorDefinition> retval = new ArrayList<>();
- for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
- if (!StringUtils.isEmpty(connectorDefinition.getSourceClass())) {
- retval.add(connectorDefinition);
- }
- }
- return retval;
+ return source.getSourceList();
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Fetches information about config fields associated with the specified builtin source",
+ response = ConfigFieldDefinition.class,
+ responseContainer = "List"
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 404, message = "builtin source does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error"),
+ @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
+ })
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/builtinsources/{name}/configdefinition")
+ public List<ConfigFieldDefinition> getSourceConfigDefinition(
+ @ApiParam(value = "The name of the builtin source")
+ final @PathParam("name") String name) throws IOException {
+ return source.getSourceConfigDefinition(name);
+ }
+
+ @POST
+ @ApiOperation(
+ value = "Reload the built-in connectors, including Sources and Sinks",
+ response = Void.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 401, message = "This operation requires super-user access"),
+ @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later."),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ @Path("/reloadBuiltInSources")
+ public void reloadSources() {
+ source.reloadConnectors(clientAppId());
}
}
diff --git a/pulsar-io/common/pom.xml b/pulsar-io/common/pom.xml
index e48d4d9..8fb2129 100644
--- a/pulsar-io/common/pom.xml
+++ b/pulsar-io/common/pom.xml
@@ -40,6 +40,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
diff --git a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
index 8066ea6..efb72e8 100644
--- a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
+++ b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
@@ -20,16 +20,14 @@ package org.apache.pulsar.io.common;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.function.Function;
@@ -44,21 +42,10 @@ public class IOConfigUtils {
}
- public static List<Field> getAllFields(Class<?> type) {
- List<Field> fields = new LinkedList<>();
- fields.addAll(Arrays.asList(type.getDeclaredFields()));
-
- if (type.getSuperclass() != null) {
- fields.addAll(getAllFields(type.getSuperclass()));
- }
-
- return fields;
- }
-
private static <T> T loadWithSecrets(Map<String, Object> map, Class<T> clazz, Function<String, String> secretsGetter) {
Map<String, Object> configs = new HashMap<>(map);
- for (Field field : getAllFields(clazz)) {
+ for (Field field : Reflections.getAllFields(clazz)) {
field.setAccessible(true);
for (Annotation annotation : field.getAnnotations()) {
if (annotation.annotationType().equals(FieldDoc.class)) {