You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/13 18:29:42 UTC

[incubator-pulsar] branch master updated: Fix: get function status with auth enable (#2516)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e5a7b53  Fix: get function status with auth enable (#2516)
e5a7b53 is described below

commit e5a7b532ffbf1d8813fc31403162fe3c17e937f5
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Thu Sep 13 11:29:37 2018 -0700

    Fix: get function status with auth enable (#2516)
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    |  2 +-
 .../client/admin/internal/FunctionsImpl.java       |  5 ++-
 .../functions/worker/FunctionRuntimeManager.java   | 41 ++++++++--------------
 .../functions/worker/rest/api/FunctionsImpl.java   |  6 ++--
 .../worker/rest/api/v2/FunctionApiV2Resource.java  |  2 +-
 5 files changed, 23 insertions(+), 33 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index def5452..62d12ec 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -160,7 +160,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
                                               final @PathParam("functionName") String functionName,
                                               final @PathParam("instanceId") String instanceId) throws IOException {
         return functions.getFunctionInstanceStatus(
-            tenant, namespace, functionName, instanceId);
+            tenant, namespace, functionName, instanceId, uri.getRequestUri());
     }
 
     @GET
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 7dc7050..1d315d7 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -110,9 +110,8 @@ public class FunctionsImpl extends BaseResource implements Functions {
         }
     }
 
-    @Override
-    public FunctionStatus getFunctionStatus(String tenant, String namespace, String function, int id)
-            throws PulsarAdminException {
+    public FunctionStatus getFunctionStatus(
+            String tenant, String namespace, String function, int id) throws PulsarAdminException {
         try {
             Response response = request(
                     functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("status"))
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 93828de..ee6eeec 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -26,6 +26,7 @@ import java.net.URISyntaxException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
@@ -248,7 +249,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
      * @return the function status
      */
     public InstanceCommunication.FunctionStatus getFunctionInstanceStatus(String tenant, String namespace,
-                                                                          String functionName, int instanceId) {
+            String functionName, int instanceId, URI uri) {
         Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
         final String assignedWorkerId = assignment.getWorkerId();
         final String workerId = this.workerConfig.getWorkerId();
@@ -306,23 +307,8 @@ public class FunctionRuntimeManager implements AutoCloseable{
                 return functionStatusBuilder.build();
             }
 
-            Client client = ClientBuilder.newClient();
-
-            // TODO: implement authentication/authorization
-            String jsonResponse = client.target(String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/status",
-                    workerInfo.getWorkerHostname(), workerInfo.getPort(), tenant, namespace, functionName, instanceId))
-                    .request(MediaType.TEXT_PLAIN)
-                    .get(String.class);
-
-            InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
-            try {
-                org.apache.pulsar.functions.utils.Utils.mergeJson(jsonResponse, functionStatusBuilder);
-            } catch (IOException e) {
-                log.warn("Got invalid function status response from {}", workerInfo, e);
-                throw new RuntimeException(e);
-            }
-            functionStatusBuilder.setWorkerId(assignedWorkerId);
-            functionStatus = functionStatusBuilder.build();
+            URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+            throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
         }
 
         return functionStatus;
@@ -426,9 +412,10 @@ public class FunctionRuntimeManager implements AutoCloseable{
      * @param namespace the namespace the function belongs to
      * @param functionName the function name
      * @return a list of function statuses
+     * @throws PulsarAdminException 
      */
     public InstanceCommunication.FunctionStatusList getAllFunctionStatus(
-            String tenant, String namespace, String functionName) {
+            String tenant, String namespace, String functionName) throws PulsarAdminException {
 
         Collection<Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName);
 
@@ -438,13 +425,15 @@ public class FunctionRuntimeManager implements AutoCloseable{
         }
 
         for (Assignment assignment : assignments) {
-
-            InstanceCommunication.FunctionStatus functionStatus = this.getFunctionInstanceStatus(
-                    assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
-                    assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
-                    assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
-                    assignment.getInstance().getInstanceId());
-
+            boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
+            InstanceCommunication.FunctionStatus functionStatus = isOwner
+                    ? (getFunctionInstanceStatus(tenant, namespace, functionName,
+                            assignment.getInstance().getInstanceId(), null))
+                    : this.functionAdmin.functions().getFunctionStatus(
+                            assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
+                            assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
+                            assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
+                            assignment.getInstance().getInstanceId());
             functionStatusListBuilder.addFunctionStatusList(functionStatus);
         }
         return functionStatusListBuilder.build();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 136bab0..df82c0d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -332,7 +332,7 @@ public class FunctionsImpl {
     }
 
     public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String functionName,
-            final String instanceId) throws IOException {
+            final String instanceId, URI uri) throws IOException {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -358,7 +358,9 @@ public class FunctionsImpl {
         FunctionStatus functionStatus = null;
         try {
             functionStatus = functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace, functionName,
-                    Integer.parseInt(instanceId));
+                    Integer.parseInt(instanceId), uri);
+        } catch (WebApplicationException we) {
+            throw we;
         } catch (Exception e) {
             log.error("{}/{}/{} Got Exception Getting Status", tenant, namespace, functionName, e);
             return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 95fe687..6c6fb18 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -104,7 +104,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
                                               final @PathParam("functionName") String functionName,
                                               final @PathParam("instanceId") String instanceId) throws IOException {
         return functions.getFunctionInstanceStatus(
-            tenant, namespace, functionName, instanceId);
+            tenant, namespace, functionName, instanceId, uri.getRequestUri());
     }
 
     @GET