You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/10/09 05:06:05 UTC

[pulsar] branch master updated: Fix getstatus logic in kubernetes runtime (#2748)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 90aa408  Fix getstatus logic in kubernetes runtime (#2748)
90aa408 is described below

commit 90aa408b9ddc53629bdc5c0e540cf5335c34399b
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Mon Oct 8 22:06:01 2018 -0700

    Fix getstatus logic in kubernetes runtime (#2748)
    
    ### Motivation
    
    When running in k8 environment, all instances of a function are owned by only one function worker. Now if there are multiple workers and the getstatus comes to a worker thats not the owner of the function, it should redirect to the right owner.
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    |  2 +-
 .../org/apache/pulsar/io/PulsarSinkE2ETest.java    |  2 +-
 .../src/main/proto/InstanceCommunication.proto     |  1 +
 .../functions/worker/FunctionRuntimeManager.java   | 41 ++++++++++++++++++----
 .../functions/worker/rest/api/FunctionsImpl.java   |  8 +++--
 .../worker/rest/api/v2/FunctionApiV2Resource.java  |  2 +-
 6 files changed, 45 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index cff3c34..132eaf3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -179,7 +179,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
                                       final @PathParam("namespace") String namespace,
                                       final @PathParam("functionName") String functionName) throws IOException {
         return functions.getFunctionStatus(
-            tenant, namespace, functionName);
+            tenant, namespace, functionName, uri.getRequestUri());
     }
 
     @GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 907cf86..36db99f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -366,7 +366,7 @@ public class PulsarSinkE2ETest {
         FunctionRuntimeManager functionRuntimeManager = functionsWorkerService.getFunctionRuntimeManager();
         functionRuntimeManager.updateRates();
         FunctionStatusList functionStats = functionRuntimeManager.getAllFunctionStatus(tenant, namespacePortion,
-                functionName);
+                functionName, null);
 
         int numInstances = functionStats.getFunctionStatusListCount();
         assertEquals(numInstances, 1);
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index 65d1b2f..d56a41d 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -55,6 +55,7 @@ message FunctionStatus {
 }
 
 message FunctionStatusList {
+    string error = 2;
     repeated FunctionStatus functionStatusList = 1;
 }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 1b8a590..47d317f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -306,8 +306,12 @@ public class FunctionRuntimeManager implements AutoCloseable{
                 return functionStatusBuilder.build();
             }
 
-            URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
-            throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+            if (uri == null) {
+                throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
+            } else {
+                URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+                throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+            }
         }
 
         return functionStatus;
@@ -346,8 +350,12 @@ public class FunctionRuntimeManager implements AutoCloseable{
                         .entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build();
             }
 
-            URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
-            throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+            if (uri == null) {
+                throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
+            } else {
+                URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+                throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+            }
         }
     }
 
@@ -471,7 +479,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
      * @throws PulsarAdminException 
      */
     public InstanceCommunication.FunctionStatusList getAllFunctionStatus(
-            String tenant, String namespace, String functionName) throws PulsarAdminException {
+            String tenant, String namespace, String functionName, URI uri) throws PulsarAdminException {
 
         Collection<Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName);
 
@@ -491,7 +499,28 @@ public class FunctionRuntimeManager implements AutoCloseable{
                     functionStatusListBuilder.addFunctionStatusList(functionStatus);
                 }
             } else {
-                return this.functionAdmin.functions().getFunctionStatus(tenant, namespace, functionName);
+                // find the hostname/port of the worker who is the owner
+
+                List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
+                WorkerInfo workerInfo = null;
+                for (WorkerInfo entry: workerInfoList) {
+                    if (assignment.getWorkerId().equals(entry.getWorkerId())) {
+                        workerInfo = entry;
+                    }
+                }
+                if (workerInfo == null) {
+                    InstanceCommunication.FunctionStatusList.Builder functionStatusBuilder
+                            = InstanceCommunication.FunctionStatusList.newBuilder();
+                    functionStatusBuilder.setError("Function not yet scheduled");
+                    return functionStatusBuilder.build();
+                }
+
+                if (uri == null) {
+                    throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
+                } else {
+                    URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+                    throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+                }
             }
         } else {
             for (Assignment assignment : assignments) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 3124731..6ab5f0d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -508,13 +508,15 @@ public class FunctionsImpl {
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
         try {
             return functionRuntimeManager.stopFunctionInstances(tenant, namespace, functionName, restart);
+        } catch (WebApplicationException we) {
+            throw we;
         } catch (Exception e) {
             log.error("Failed to restart function: {}/{}/{}", tenant, namespace, functionName, e);
             return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build();
         }
     }
 
-    public Response getFunctionStatus(final String tenant, final String namespace, final String functionName)
+    public Response getFunctionStatus(final String tenant, final String namespace, final String functionName, URI uri)
             throws IOException {
 
         if (!isWorkerServiceAvailable()) {
@@ -540,7 +542,9 @@ public class FunctionsImpl {
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
         InstanceCommunication.FunctionStatusList functionStatusList = null;
         try {
-            functionStatusList = functionRuntimeManager.getAllFunctionStatus(tenant, namespace, functionName);
+            functionStatusList = functionRuntimeManager.getAllFunctionStatus(tenant, namespace, functionName, uri);
+        } catch (WebApplicationException we) {
+            throw we;
         } catch (Exception e) {
             log.error("Got Exception Getting Status", e);
             FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 1e44a60..46c7974 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -115,7 +115,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
                                       final @PathParam("namespace") String namespace,
                                       final @PathParam("functionName") String functionName) throws IOException {
         return functions.getFunctionStatus(
-            tenant, namespace, functionName);
+            tenant, namespace, functionName, uri.getRequestUri());
     }
 
     @GET