You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/04/22 04:19:03 UTC
[pulsar] branch master updated: deprecate and relocate connector
related endpoints in functions REST API (#4087)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1026b07 deprecate and relocate connector related endpoints in functions REST API (#4087)
1026b07 is described below
commit 1026b07327b26fc416b2735668ad84a625345adb
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sun Apr 21 21:18:57 2019 -0700
deprecate and relocate connector related endpoints in functions REST API (#4087)
* deprecate and relocate connector related endpoints in functions REST API
* cleaning up
* cleaning up
---
.../pulsar/broker/admin/impl/FunctionsBase.java | 4 ++++
.../org/apache/pulsar/broker/admin/v2/Worker.java | 18 ++++++++++++++++
.../org/apache/pulsar/client/admin/Functions.java | 9 ++++++++
.../pulsar/functions/worker/rest/RestUtils.java | 6 ++++++
.../functions/worker/rest/api/ComponentImpl.java | 7 +------
.../pulsar/functions/worker/rest/api/SinkImpl.java | 2 ++
.../functions/worker/rest/api/SourceImpl.java | 2 ++
.../functions/worker/rest/api/WorkerImpl.java | 24 ++++++++++++++--------
.../worker/rest/api/v2/FunctionApiV2Resource.java | 4 ++++
.../worker/rest/api/v2/WorkerApiV2Resource.java | 12 +++++++++++
.../worker/rest/api/v3/FunctionApiV3Resource.java | 4 ++++
11 files changed, 77 insertions(+), 15 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 4c79bfd..ba1e58d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -400,6 +400,10 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
@ApiResponse(code = 408, message = "Request timeout")
})
@Path("/connectors")
+ @Deprecated
+ /**
+ * Deprecated in favor of moving endpoint to {@link org.apache.pulsar.broker.admin.v2.Worker}
+ */
public List<ConnectorDefinition> getConnectorsList() throws IOException {
return functions.getListOfConnectors();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
index a40132d..8307b1a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
@@ -24,6 +24,7 @@ import io.swagger.annotations.ApiResponses;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.common.functions.WorkerInfo;
+import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
@@ -31,6 +32,7 @@ import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
+import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -96,4 +98,20 @@ public class Worker extends AdminResource implements Supplier<WorkerService> {
public Map<String, Collection<String>> getAssignments() {
return worker.getAssignments();
}
+
+ @GET
+ @ApiOperation(
+ value = "Fetches a list of supported Pulsar IO connectors currently running in cluster mode",
+ response = List.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 408, message = "Request timeout")
+ })
+ @Path("/connectors")
+ @Produces(MediaType.APPLICATION_JSON)
+ public List<ConnectorDefinition> getConnectorsList() throws IOException {
+ return worker.getListOfConnectors();
+ }
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 481c5fd..5c14aa9 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -379,30 +379,39 @@ public interface Functions {
void downloadFunction(String destinationFile, String path) throws PulsarAdminException;
/**
+ * Deprecated in favor of getting sources and sinks for their own APIs
+ *
* Fetches a list of supported Pulsar IO connectors currently running in cluster mode
*
* @throws PulsarAdminException
* Unexpected error
*
*/
+ @Deprecated
List<ConnectorDefinition> getConnectorsList() throws PulsarAdminException;
/**
+ * Deprecated in favor of getting sources and sinks for their own APIs
+ *
* Fetches a list of supported Pulsar IO sources currently running in cluster mode
*
* @throws PulsarAdminException
* Unexpected error
*
*/
+ @Deprecated
Set<String> getSources() throws PulsarAdminException;
/**
+ * Deprecated in favor of getting sources and sinks for their own APIs
+ *
* Fetches a list of supported Pulsar IO sinks currently running in cluster mode
*
* @throws PulsarAdminException
* Unexpected error
*
*/
+ @Deprecated
Set<String> getSinks() throws PulsarAdminException;
/**
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/RestUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/RestUtils.java
index 133e6dc..6378b34 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/RestUtils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/RestUtils.java
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import javax.ws.rs.core.Response;
+
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class RestUtils {
@@ -36,4 +38,8 @@ public final class RestUtils {
return createBaseMessage(message).toString();
}
+ public static void throwUnavailableException() {
+ throw new RestException(Response.Status.SERVICE_UNAVAILABLE,
+ "Function worker service is not done initializing. " + "Please try again in a little while.");
+ }
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index b6652cf..87dc897 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -114,10 +114,10 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.functions.utils.ComponentType.FUNCTION;
import static org.apache.pulsar.functions.utils.ComponentType.SINK;
import static org.apache.pulsar.functions.utils.ComponentType.SOURCE;
-import static org.apache.pulsar.functions.utils.FunctionCommon.extractFileFromPkgURL;
import static org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace;
import static org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName;
import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
+import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
@Slf4j
public abstract class ComponentImpl {
@@ -1675,11 +1675,6 @@ private FunctionDetails validateUpdateRequestParams(final String tenant,
}
}
- protected static void throwUnavailableException() {
- throw new RestException(Status.SERVICE_UNAVAILABLE,
- "Function worker service is not done initializing. " + "Please try again in a little while.");
- }
-
private void throwStateStoreUnvailableResponse() {
throw new RestException(Status.SERVICE_UNAVAILABLE,
"State storage client is not done initializing. " + "Please try again in a little while.");
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index 3661d01..c0cac01 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -41,6 +41,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.function.Supplier;
+import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
+
@Slf4j
public class SinkImpl extends ComponentImpl {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index aa8e074..43bb4e7 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -41,6 +41,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.function.Supplier;
+import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
+
@Slf4j
public class SourceImpl extends ComponentImpl {
private class GetSourceStatus extends GetStatus<SourceStatus, SourceStatus.SourceInstanceStatus.SourceInstanceStatusData> {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index 041ac26..d27eaef 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.worker.rest.api;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.WorkerInfo;
+import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;
@@ -45,6 +46,7 @@ import java.util.Map;
import java.util.function.Supplier;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
@Slf4j
public class WorkerImpl {
@@ -77,7 +79,7 @@ public class WorkerImpl {
public List<WorkerInfo> getCluster() {
if (!isWorkerServiceAvailable()) {
- throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
+ throwUnavailableException();
}
List<WorkerInfo> workers = worker().getMembershipManager().getCurrentMembership();
return workers;
@@ -85,9 +87,8 @@ public class WorkerImpl {
public WorkerInfo getClusterLeader() {
if (!isWorkerServiceAvailable()) {
- throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
+ throwUnavailableException();
}
-
MembershipManager membershipManager = worker().getMembershipManager();
WorkerInfo leader = membershipManager.getLeader();
@@ -99,9 +100,8 @@ public class WorkerImpl {
}
public Map<String, Collection<String>> getAssignments() {
-
if (!isWorkerServiceAvailable()) {
- throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
+ throwUnavailableException();
}
FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
@@ -128,9 +128,7 @@ public class WorkerImpl {
private List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrics() {
if (!isWorkerServiceAvailable()) {
- throw new WebApplicationException(
- Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData("Function worker service is not available")).build());
+ throwUnavailableException();
}
return worker().getMetricsGenerator().generate();
}
@@ -146,7 +144,7 @@ public class WorkerImpl {
private List<WorkerFunctionInstanceStats> getFunctionsMetrics() throws IOException {
if (!isWorkerServiceAvailable()) {
- throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
+ throwUnavailableException();
}
WorkerService workerService = worker();
@@ -184,4 +182,12 @@ public class WorkerImpl {
}
return metricsList;
}
+
+ public List<ConnectorDefinition> getListOfConnectors() {
+ if (!isWorkerServiceAvailable()) {
+ throwUnavailableException();
+ }
+
+ return this.worker().getConnectorsManager().getConnectors();
+ }
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index bf5ec9a..23d3b4f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -311,6 +311,10 @@ public class FunctionApiV2Resource extends FunctionApiResource {
@ApiResponse(code = 408, message = "Request timeout")
})
@Path("/connectors")
+ /**
+ * Deprecated in favor of moving endpoint to {@link org.apache.pulsar.broker.admin.v2.Worker}
+ */
+ @Deprecated
public List<ConnectorDefinition> getConnectorsList() throws IOException {
return functions.getListOfConnectors();
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
index 29d3ff0..e2b8c03 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
@@ -25,6 +25,7 @@ import io.swagger.annotations.ApiResponses;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.common.functions.WorkerInfo;
+import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
@@ -36,6 +37,7 @@ import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
+import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -120,4 +122,14 @@ public class WorkerApiV2Resource implements Supplier<WorkerService> {
public Map<String, Collection<String>> getAssignments() {
return worker.getAssignments();
}
+
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 408, message = "Request timeout")
+ })
+ @Path("/connectors")
+ public List<ConnectorDefinition> getConnectorsList() throws IOException {
+ return worker.getListOfConnectors();
+ }
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
index afbe39c..6159b63 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
@@ -306,6 +306,10 @@ public class FunctionApiV3Resource extends FunctionApiResource {
@GET
@Path("/connectors")
+ /**
+ * Deprecated in favor of moving endpoint to {@link org.apache.pulsar.broker.admin.v2.Worker}
+ */
+ @Deprecated
public List<ConnectorDefinition> getConnectorsList() throws IOException {
return functions.getListOfConnectors();
}