You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/04/22 04:19:03 UTC

[pulsar] branch master updated: deprecate and relocate connector related endpoints in functions REST API (#4087)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1026b07   deprecate and relocate connector related endpoints in functions REST API (#4087)
1026b07 is described below

commit 1026b07327b26fc416b2735668ad84a625345adb
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sun Apr 21 21:18:57 2019 -0700

     deprecate and relocate connector related endpoints in functions REST API (#4087)
    
    * deprecate and relocate connector related endpoints in functions REST API
    
    * cleaning up
    
    * cleaning up
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    |  4 ++++
 .../org/apache/pulsar/broker/admin/v2/Worker.java  | 18 ++++++++++++++++
 .../org/apache/pulsar/client/admin/Functions.java  |  9 ++++++++
 .../pulsar/functions/worker/rest/RestUtils.java    |  6 ++++++
 .../functions/worker/rest/api/ComponentImpl.java   |  7 +------
 .../pulsar/functions/worker/rest/api/SinkImpl.java |  2 ++
 .../functions/worker/rest/api/SourceImpl.java      |  2 ++
 .../functions/worker/rest/api/WorkerImpl.java      | 24 ++++++++++++++--------
 .../worker/rest/api/v2/FunctionApiV2Resource.java  |  4 ++++
 .../worker/rest/api/v2/WorkerApiV2Resource.java    | 12 +++++++++++
 .../worker/rest/api/v3/FunctionApiV3Resource.java  |  4 ++++
 11 files changed, 77 insertions(+), 15 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 4c79bfd..ba1e58d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -400,6 +400,10 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
             @ApiResponse(code = 408, message = "Request timeout")
     })
     @Path("/connectors")
+    @Deprecated
+    /**
+     * Deprecated in favor of moving endpoint to {@link org.apache.pulsar.broker.admin.v2.Worker}
+     */
     public List<ConnectorDefinition> getConnectorsList() throws IOException {
         return functions.getListOfConnectors();
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
index a40132d..8307b1a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
@@ -24,6 +24,7 @@ import io.swagger.annotations.ApiResponses;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.common.functions.WorkerInfo;
+import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
 
@@ -31,6 +32,7 @@ import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -96,4 +98,20 @@ public class Worker extends AdminResource implements Supplier<WorkerService> {
     public Map<String, Collection<String>> getAssignments() {
         return worker.getAssignments();
     }
+
+    @GET
+    @ApiOperation(
+            value = "Fetches a list of supported Pulsar IO connectors currently running in cluster mode",
+            response = List.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 408, message = "Request timeout")
+    })
+    @Path("/connectors")
+    @Produces(MediaType.APPLICATION_JSON)
+    public List<ConnectorDefinition> getConnectorsList() throws IOException {
+        return worker.getListOfConnectors();
+    }
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 481c5fd..5c14aa9 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -379,30 +379,39 @@ public interface Functions {
     void downloadFunction(String destinationFile, String path) throws PulsarAdminException;
 
     /**
+     * Deprecated in favor of getting sources and sinks for their own APIs
+     *
      * Fetches a list of supported Pulsar IO connectors currently running in cluster mode
      *
      * @throws PulsarAdminException
      *             Unexpected error
      *
      */
+    @Deprecated
     List<ConnectorDefinition> getConnectorsList() throws PulsarAdminException;
 
     /**
+     * Deprecated in favor of getting sources and sinks for their own APIs
+     *
      * Fetches a list of supported Pulsar IO sources currently running in cluster mode
      *
      * @throws PulsarAdminException
      *             Unexpected error
      *
      */
+    @Deprecated
     Set<String> getSources() throws PulsarAdminException;
 
     /**
+     * Deprecated in favor of getting sources and sinks for their own APIs
+     *
      * Fetches a list of supported Pulsar IO sinks currently running in cluster mode
      *
      * @throws PulsarAdminException
      *             Unexpected error
      *
      */
+    @Deprecated
     Set<String> getSinks() throws PulsarAdminException;
 
     /**
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/RestUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/RestUtils.java
index 133e6dc..6378b34 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/RestUtils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/RestUtils.java
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 
+import javax.ws.rs.core.Response;
+
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public final class RestUtils {
 
@@ -36,4 +38,8 @@ public final class RestUtils {
         return createBaseMessage(message).toString();
     }
 
+    public static void throwUnavailableException() {
+        throw new RestException(Response.Status.SERVICE_UNAVAILABLE,
+                "Function worker service is not done initializing. " + "Please try again in a little while.");
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index b6652cf..87dc897 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -114,10 +114,10 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.functions.utils.ComponentType.FUNCTION;
 import static org.apache.pulsar.functions.utils.ComponentType.SINK;
 import static org.apache.pulsar.functions.utils.ComponentType.SOURCE;
-import static org.apache.pulsar.functions.utils.FunctionCommon.extractFileFromPkgURL;
 import static org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace;
 import static org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName;
 import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
+import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
 
 @Slf4j
 public abstract class ComponentImpl {
@@ -1675,11 +1675,6 @@ private FunctionDetails validateUpdateRequestParams(final String tenant,
         }
     }
 
-    protected static void throwUnavailableException() {
-        throw new RestException(Status.SERVICE_UNAVAILABLE,
-                "Function worker service is not done initializing. " + "Please try again in a little while.");
-    }
-
     private void throwStateStoreUnvailableResponse() {
         throw new RestException(Status.SERVICE_UNAVAILABLE,
                 "State storage client is not done initializing. " + "Please try again in a little while.");
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index 3661d01..c0cac01 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -41,6 +41,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.function.Supplier;
 
+import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
+
 @Slf4j
 public class SinkImpl extends ComponentImpl {
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index aa8e074..43bb4e7 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -41,6 +41,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.function.Supplier;
 
+import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
+
 @Slf4j
 public class SourceImpl extends ComponentImpl {
     private class GetSourceStatus extends GetStatus<SourceStatus, SourceStatus.SourceInstanceStatus.SourceInstanceStatusData> {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index 041ac26..d27eaef 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.worker.rest.api;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.functions.WorkerInfo;
+import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;
@@ -45,6 +46,7 @@ import java.util.Map;
 import java.util.function.Supplier;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
 
 @Slf4j
 public class WorkerImpl {
@@ -77,7 +79,7 @@ public class WorkerImpl {
 
     public List<WorkerInfo> getCluster() {
         if (!isWorkerServiceAvailable()) {
-            throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
+            throwUnavailableException();
         }
         List<WorkerInfo> workers = worker().getMembershipManager().getCurrentMembership();
         return workers;
@@ -85,9 +87,8 @@ public class WorkerImpl {
 
     public WorkerInfo getClusterLeader() {
         if (!isWorkerServiceAvailable()) {
-            throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
+            throwUnavailableException();
         }
-
         MembershipManager membershipManager = worker().getMembershipManager();
         WorkerInfo leader = membershipManager.getLeader();
 
@@ -99,9 +100,8 @@ public class WorkerImpl {
     }
 
     public Map<String, Collection<String>> getAssignments() {
-
         if (!isWorkerServiceAvailable()) {
-            throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
+            throwUnavailableException();
         }
 
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
@@ -128,9 +128,7 @@ public class WorkerImpl {
 
     private List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrics() {
         if (!isWorkerServiceAvailable()) {
-            throw new WebApplicationException(
-                    Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
-                            .entity(new ErrorData("Function worker service is not available")).build());
+            throwUnavailableException();
         }
         return worker().getMetricsGenerator().generate();
     }
@@ -146,7 +144,7 @@ public class WorkerImpl {
 
     private List<WorkerFunctionInstanceStats> getFunctionsMetrics() throws IOException {
         if (!isWorkerServiceAvailable()) {
-            throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
+            throwUnavailableException();
         }
 
         WorkerService workerService = worker();
@@ -184,4 +182,12 @@ public class WorkerImpl {
         }
         return metricsList;
     }
+
+    public List<ConnectorDefinition> getListOfConnectors() {
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+
+        return this.worker().getConnectorsManager().getConnectors();
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index bf5ec9a..23d3b4f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -311,6 +311,10 @@ public class FunctionApiV2Resource extends FunctionApiResource {
             @ApiResponse(code = 408, message = "Request timeout")
     })
     @Path("/connectors")
+    /**
+     * Deprecated in favor of moving endpoint to {@link org.apache.pulsar.broker.admin.v2.Worker}
+     */
+    @Deprecated
     public List<ConnectorDefinition> getConnectorsList() throws IOException {
         return functions.getListOfConnectors();
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
index 29d3ff0..e2b8c03 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
@@ -25,6 +25,7 @@ import io.swagger.annotations.ApiResponses;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.web.AuthenticationFilter;
 import org.apache.pulsar.common.functions.WorkerInfo;
+import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
 
@@ -36,6 +37,7 @@ import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -120,4 +122,14 @@ public class WorkerApiV2Resource implements Supplier<WorkerService> {
     public Map<String, Collection<String>> getAssignments() {
         return worker.getAssignments();
     }
+
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 408, message = "Request timeout")
+    })
+    @Path("/connectors")
+    public List<ConnectorDefinition> getConnectorsList() throws IOException {
+        return worker.getListOfConnectors();
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
index afbe39c..6159b63 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
@@ -306,6 +306,10 @@ public class FunctionApiV3Resource extends FunctionApiResource {
 
     @GET
     @Path("/connectors")
+    /**
+     * Deprecated in favor of moving endpoint to {@link org.apache.pulsar.broker.admin.v2.Worker}
+     */
+    @Deprecated
     public List<ConnectorDefinition> getConnectorsList() throws IOException {
         return functions.getListOfConnectors();
     }