You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/10/09 05:06:05 UTC
[pulsar] branch master updated: Fix getstatus logic in kubernetes
runtime (#2748)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 90aa408 Fix getstatus logic in kubernetes runtime (#2748)
90aa408 is described below
commit 90aa408b9ddc53629bdc5c0e540cf5335c34399b
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Mon Oct 8 22:06:01 2018 -0700
Fix getstatus logic in kubernetes runtime (#2748)
### Motivation
When running in k8 environment, all instances of a function are owned by only one function worker. Now if there are multiple workers and the getstatus comes to a worker thats not the owner of the function, it should redirect to the right owner.
---
.../pulsar/broker/admin/impl/FunctionsBase.java | 2 +-
.../org/apache/pulsar/io/PulsarSinkE2ETest.java | 2 +-
.../src/main/proto/InstanceCommunication.proto | 1 +
.../functions/worker/FunctionRuntimeManager.java | 41 ++++++++++++++++++----
.../functions/worker/rest/api/FunctionsImpl.java | 8 +++--
.../worker/rest/api/v2/FunctionApiV2Resource.java | 2 +-
6 files changed, 45 insertions(+), 11 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index cff3c34..132eaf3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -179,7 +179,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) throws IOException {
return functions.getFunctionStatus(
- tenant, namespace, functionName);
+ tenant, namespace, functionName, uri.getRequestUri());
}
@GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 907cf86..36db99f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -366,7 +366,7 @@ public class PulsarSinkE2ETest {
FunctionRuntimeManager functionRuntimeManager = functionsWorkerService.getFunctionRuntimeManager();
functionRuntimeManager.updateRates();
FunctionStatusList functionStats = functionRuntimeManager.getAllFunctionStatus(tenant, namespacePortion,
- functionName);
+ functionName, null);
int numInstances = functionStats.getFunctionStatusListCount();
assertEquals(numInstances, 1);
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index 65d1b2f..d56a41d 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -55,6 +55,7 @@ message FunctionStatus {
}
message FunctionStatusList {
+ string error = 2;
repeated FunctionStatus functionStatusList = 1;
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 1b8a590..47d317f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -306,8 +306,12 @@ public class FunctionRuntimeManager implements AutoCloseable{
return functionStatusBuilder.build();
}
- URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
- throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+ if (uri == null) {
+ throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
+ } else {
+ URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+ throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+ }
}
return functionStatus;
@@ -346,8 +350,12 @@ public class FunctionRuntimeManager implements AutoCloseable{
.entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build();
}
- URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
- throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+ if (uri == null) {
+ throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
+ } else {
+ URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+ throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+ }
}
}
@@ -471,7 +479,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
* @throws PulsarAdminException
*/
public InstanceCommunication.FunctionStatusList getAllFunctionStatus(
- String tenant, String namespace, String functionName) throws PulsarAdminException {
+ String tenant, String namespace, String functionName, URI uri) throws PulsarAdminException {
Collection<Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName);
@@ -491,7 +499,28 @@ public class FunctionRuntimeManager implements AutoCloseable{
functionStatusListBuilder.addFunctionStatusList(functionStatus);
}
} else {
- return this.functionAdmin.functions().getFunctionStatus(tenant, namespace, functionName);
+ // find the hostname/port of the worker who is the owner
+
+ List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
+ WorkerInfo workerInfo = null;
+ for (WorkerInfo entry: workerInfoList) {
+ if (assignment.getWorkerId().equals(entry.getWorkerId())) {
+ workerInfo = entry;
+ }
+ }
+ if (workerInfo == null) {
+ InstanceCommunication.FunctionStatusList.Builder functionStatusBuilder
+ = InstanceCommunication.FunctionStatusList.newBuilder();
+ functionStatusBuilder.setError("Function not yet scheduled");
+ return functionStatusBuilder.build();
+ }
+
+ if (uri == null) {
+ throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
+ } else {
+ URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+ throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+ }
}
} else {
for (Assignment assignment : assignments) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 3124731..6ab5f0d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -508,13 +508,15 @@ public class FunctionsImpl {
FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
try {
return functionRuntimeManager.stopFunctionInstances(tenant, namespace, functionName, restart);
+ } catch (WebApplicationException we) {
+ throw we;
} catch (Exception e) {
log.error("Failed to restart function: {}/{}/{}", tenant, namespace, functionName, e);
return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build();
}
}
- public Response getFunctionStatus(final String tenant, final String namespace, final String functionName)
+ public Response getFunctionStatus(final String tenant, final String namespace, final String functionName, URI uri)
throws IOException {
if (!isWorkerServiceAvailable()) {
@@ -540,7 +542,9 @@ public class FunctionsImpl {
FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
InstanceCommunication.FunctionStatusList functionStatusList = null;
try {
- functionStatusList = functionRuntimeManager.getAllFunctionStatus(tenant, namespace, functionName);
+ functionStatusList = functionRuntimeManager.getAllFunctionStatus(tenant, namespace, functionName, uri);
+ } catch (WebApplicationException we) {
+ throw we;
} catch (Exception e) {
log.error("Got Exception Getting Status", e);
FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 1e44a60..46c7974 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -115,7 +115,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) throws IOException {
return functions.getFunctionStatus(
- tenant, namespace, functionName);
+ tenant, namespace, functionName, uri.getRequestUri());
}
@GET