You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/01 16:35:14 UTC

[incubator-pulsar] branch master updated: adding function worker initialized check (#1697)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c4b1c4  adding function worker initialized check (#1697)
1c4b1c4 is described below

commit 1c4b1c4b9927ccb3d3f45feff97c5c058f46fe1c
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue May 1 09:35:12 2018 -0700

    adding function worker initialized check (#1697)
    
    * adding function worker intialized check
    
    * removing newline
    
    * refactoring duplicate code
    
    * fixing unit tests
---
 .../pulsar/client/admin/internal/BaseResource.java |  2 +-
 .../client/admin/internal/FunctionsImpl.java       |  2 -
 .../pulsar/functions/worker/WorkerService.java     |  4 ++
 .../functions/worker/rest/api/FunctionsImpl.java   | 63 ++++++++++++++++++++++
 .../rest/api/v2/FunctionApiV2ResourceTest.java     |  1 +
 5 files changed, 69 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
index a0f747c..96671ec 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
@@ -153,7 +153,7 @@ public abstract class BaseResource {
             if (e.getCause() instanceof java.net.ConnectException) {
                 return new ConnectException(e.getCause());
             } else {
-                return new HttpErrorException(e);
+                return new PulsarAdminException((ServerErrorException) e);
             }
         } else if (e instanceof WebApplicationException) {
             // Handle 5xx exceptions
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 9d5b8ee..66da497 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -42,8 +42,6 @@ import javax.ws.rs.core.Response;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.util.List;
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index e33dcba..dfe5834 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -45,6 +45,7 @@ public class WorkerService {
     private Namespace dlogNamespace;
     private MembershipManager membershipManager;
     private SchedulerManager schedulerManager;
+    private boolean isInitialized = false;
 
     public WorkerService(WorkerConfig workerConfig) {
         this.workerConfig = workerConfig;
@@ -117,6 +118,9 @@ public class WorkerService {
             // Start function runtime manager
             this.functionRuntimeManager.start();
 
+            // indicate function worker service is done intializing
+            this.isInitialized = true;
+
         } catch (Exception e) {
             log.error("Error Starting up in worker", e);
             throw new RuntimeException(e);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 49c652f..5261750 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -82,6 +82,17 @@ public class FunctionsImpl {
         }
     }
 
+    private boolean isWorkerServiceAvailable() {
+        WorkerService workerService = workerServiceSupplier.get();
+        if (workerService == null) {
+            return false;
+        }
+        if (!workerService.isInitialized()) {
+            return false;
+        }
+        return true;
+    }
+
     @POST
     @Path("/{tenant}/{namespace}/{functionName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
@@ -91,6 +102,11 @@ public class FunctionsImpl {
                                      final @FormDataParam("data") InputStream uploadedInputStream,
                                      final @FormDataParam("data") FormDataContentDisposition fileDetail,
                                      final @FormDataParam("functionDetails") String functionDetailsJson) {
+
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
         FunctionDetails functionDetails;
         // validate parameters
         try {
@@ -141,6 +157,10 @@ public class FunctionsImpl {
                                    final @FormDataParam("data") FormDataContentDisposition fileDetail,
                                    final @FormDataParam("functionDetails") String functionDetailsJson) {
 
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
         FunctionDetails functionDetails;
         // validate parameters
         try {
@@ -187,6 +207,10 @@ public class FunctionsImpl {
                                        final @PathParam("namespace") String namespace,
                                        final @PathParam("functionName") String functionName) {
 
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
         // validate parameters
         try {
             validateDeregisterRequestParams(tenant, namespace, functionName);
@@ -244,6 +268,10 @@ public class FunctionsImpl {
                                     final @PathParam("functionName") String functionName)
             throws IOException {
 
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
         // validate parameters
         try {
             validateGetFunctionRequestParams(tenant, namespace, functionName);
@@ -276,6 +304,10 @@ public class FunctionsImpl {
                                               final @PathParam("functionName") String functionName,
                                               final @PathParam("instanceId") String instanceId) throws IOException {
 
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
         // validate parameters
         try {
             validateGetFunctionInstanceRequestParams(tenant, namespace, functionName, instanceId);
@@ -319,6 +351,10 @@ public class FunctionsImpl {
                                       final @PathParam("namespace") String namespace,
                                       final @PathParam("functionName") String functionName) throws IOException {
 
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
         // validate parameters
         try {
             validateGetFunctionRequestParams(tenant, namespace, functionName);
@@ -360,6 +396,10 @@ public class FunctionsImpl {
     public Response listFunctions(final @PathParam("tenant") String tenant,
                                   final @PathParam("namespace") String namespace) {
 
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
         // validate parameters
         try {
             validateListFunctionRequestParams(tenant, namespace);
@@ -429,6 +469,11 @@ public class FunctionsImpl {
     @GET
     @Path("/cluster")
     public Response getCluster() {
+
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
         MembershipManager membershipManager = worker().getMembershipManager();
         List<MembershipManager.WorkerInfo> members = membershipManager.getCurrentMembership();
         return Response.status(Status.OK).entity(new Gson().toJson(members)).build();
@@ -437,6 +482,11 @@ public class FunctionsImpl {
     @GET
     @Path("/assignments")
     public Response getAssignments() {
+
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
         Map<String, Map<String, Function.Assignment>> assignments = functionRuntimeManager.getCurrentAssignments();
         Map<String, Collection<String>> ret = new HashMap<>();
@@ -456,6 +506,11 @@ public class FunctionsImpl {
                                     final @FormDataParam("data") String input,
                                     final @FormDataParam("dataStream") InputStream uploadedInputStream,
                                     final @FormDataParam("topic") String topic) {
+
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
         FunctionDetails functionDetails;
         // validate parameters
         try {
@@ -718,4 +773,12 @@ public class FunctionsImpl {
         }
     }
 
+    private Response getUnavailableResponse() {
+        return Response.status(Status.SERVICE_UNAVAILABLE)
+                .type(MediaType.APPLICATION_JSON)
+                .entity(new ErrorData("Function worker service is not done initializing. "
+                        + "Please try again in a little while."))
+                .build();
+    }
+
 }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 31c928d..cddc152 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -119,6 +119,7 @@ public class FunctionApiV2ResourceTest {
         this.mockedWorkerService = mock(WorkerService.class);
         when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
         when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
+        when(mockedWorkerService.isInitialized()).thenReturn(true);
 
         // worker config
         WorkerConfig workerConfig = new WorkerConfig()

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.