You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/01 16:35:14 UTC
[incubator-pulsar] branch master updated: adding function worker
initialized check (#1697)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1c4b1c4 adding function worker initialized check (#1697)
1c4b1c4 is described below
commit 1c4b1c4b9927ccb3d3f45feff97c5c058f46fe1c
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue May 1 09:35:12 2018 -0700
adding function worker initialized check (#1697)
* adding function worker intialized check
* removing newline
* refactoring duplicate code
* fixing unit tests
---
.../pulsar/client/admin/internal/BaseResource.java | 2 +-
.../client/admin/internal/FunctionsImpl.java | 2 -
.../pulsar/functions/worker/WorkerService.java | 4 ++
.../functions/worker/rest/api/FunctionsImpl.java | 63 ++++++++++++++++++++++
.../rest/api/v2/FunctionApiV2ResourceTest.java | 1 +
5 files changed, 69 insertions(+), 3 deletions(-)
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
index a0f747c..96671ec 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
@@ -153,7 +153,7 @@ public abstract class BaseResource {
if (e.getCause() instanceof java.net.ConnectException) {
return new ConnectException(e.getCause());
} else {
- return new HttpErrorException(e);
+ return new PulsarAdminException((ServerErrorException) e);
}
} else if (e instanceof WebApplicationException) {
// Handle 5xx exceptions
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 9d5b8ee..66da497 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -42,8 +42,6 @@ import javax.ws.rs.core.Response;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.List;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index e33dcba..dfe5834 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -45,6 +45,7 @@ public class WorkerService {
private Namespace dlogNamespace;
private MembershipManager membershipManager;
private SchedulerManager schedulerManager;
+ private boolean isInitialized = false;
public WorkerService(WorkerConfig workerConfig) {
this.workerConfig = workerConfig;
@@ -117,6 +118,9 @@ public class WorkerService {
// Start function runtime manager
this.functionRuntimeManager.start();
+ // indicate function worker service is done intializing
+ this.isInitialized = true;
+
} catch (Exception e) {
log.error("Error Starting up in worker", e);
throw new RuntimeException(e);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 49c652f..5261750 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -82,6 +82,17 @@ public class FunctionsImpl {
}
}
+ private boolean isWorkerServiceAvailable() {
+ WorkerService workerService = workerServiceSupplier.get();
+ if (workerService == null) {
+ return false;
+ }
+ if (!workerService.isInitialized()) {
+ return false;
+ }
+ return true;
+ }
+
@POST
@Path("/{tenant}/{namespace}/{functionName}")
@Consumes(MediaType.MULTIPART_FORM_DATA)
@@ -91,6 +102,11 @@ public class FunctionsImpl {
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("functionDetails") String functionDetailsJson) {
+
+ if (!isWorkerServiceAvailable()) {
+ return getUnavailableResponse();
+ }
+
FunctionDetails functionDetails;
// validate parameters
try {
@@ -141,6 +157,10 @@ public class FunctionsImpl {
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("functionDetails") String functionDetailsJson) {
+ if (!isWorkerServiceAvailable()) {
+ return getUnavailableResponse();
+ }
+
FunctionDetails functionDetails;
// validate parameters
try {
@@ -187,6 +207,10 @@ public class FunctionsImpl {
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) {
+ if (!isWorkerServiceAvailable()) {
+ return getUnavailableResponse();
+ }
+
// validate parameters
try {
validateDeregisterRequestParams(tenant, namespace, functionName);
@@ -244,6 +268,10 @@ public class FunctionsImpl {
final @PathParam("functionName") String functionName)
throws IOException {
+ if (!isWorkerServiceAvailable()) {
+ return getUnavailableResponse();
+ }
+
// validate parameters
try {
validateGetFunctionRequestParams(tenant, namespace, functionName);
@@ -276,6 +304,10 @@ public class FunctionsImpl {
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) throws IOException {
+ if (!isWorkerServiceAvailable()) {
+ return getUnavailableResponse();
+ }
+
// validate parameters
try {
validateGetFunctionInstanceRequestParams(tenant, namespace, functionName, instanceId);
@@ -319,6 +351,10 @@ public class FunctionsImpl {
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) throws IOException {
+ if (!isWorkerServiceAvailable()) {
+ return getUnavailableResponse();
+ }
+
// validate parameters
try {
validateGetFunctionRequestParams(tenant, namespace, functionName);
@@ -360,6 +396,10 @@ public class FunctionsImpl {
public Response listFunctions(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace) {
+ if (!isWorkerServiceAvailable()) {
+ return getUnavailableResponse();
+ }
+
// validate parameters
try {
validateListFunctionRequestParams(tenant, namespace);
@@ -429,6 +469,11 @@ public class FunctionsImpl {
@GET
@Path("/cluster")
public Response getCluster() {
+
+ if (!isWorkerServiceAvailable()) {
+ return getUnavailableResponse();
+ }
+
MembershipManager membershipManager = worker().getMembershipManager();
List<MembershipManager.WorkerInfo> members = membershipManager.getCurrentMembership();
return Response.status(Status.OK).entity(new Gson().toJson(members)).build();
@@ -437,6 +482,11 @@ public class FunctionsImpl {
@GET
@Path("/assignments")
public Response getAssignments() {
+
+ if (!isWorkerServiceAvailable()) {
+ return getUnavailableResponse();
+ }
+
FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
Map<String, Map<String, Function.Assignment>> assignments = functionRuntimeManager.getCurrentAssignments();
Map<String, Collection<String>> ret = new HashMap<>();
@@ -456,6 +506,11 @@ public class FunctionsImpl {
final @FormDataParam("data") String input,
final @FormDataParam("dataStream") InputStream uploadedInputStream,
final @FormDataParam("topic") String topic) {
+
+ if (!isWorkerServiceAvailable()) {
+ return getUnavailableResponse();
+ }
+
FunctionDetails functionDetails;
// validate parameters
try {
@@ -718,4 +773,12 @@ public class FunctionsImpl {
}
}
+ private Response getUnavailableResponse() {
+ return Response.status(Status.SERVICE_UNAVAILABLE)
+ .type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData("Function worker service is not done initializing. "
+ + "Please try again in a little while."))
+ .build();
+ }
+
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 31c928d..cddc152 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -119,6 +119,7 @@ public class FunctionApiV2ResourceTest {
this.mockedWorkerService = mock(WorkerService.class);
when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
+ when(mockedWorkerService.isInitialized()).thenReturn(true);
// worker config
WorkerConfig workerConfig = new WorkerConfig()
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.