You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/13 18:29:42 UTC
[incubator-pulsar] branch master updated: Fix: get function status
with auth enable (#2516)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e5a7b53 Fix: get function status with auth enable (#2516)
e5a7b53 is described below
commit e5a7b532ffbf1d8813fc31403162fe3c17e937f5
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Thu Sep 13 11:29:37 2018 -0700
Fix: get function status with auth enable (#2516)
---
.../pulsar/broker/admin/impl/FunctionsBase.java | 2 +-
.../client/admin/internal/FunctionsImpl.java | 5 ++-
.../functions/worker/FunctionRuntimeManager.java | 41 ++++++++--------------
.../functions/worker/rest/api/FunctionsImpl.java | 6 ++--
.../worker/rest/api/v2/FunctionApiV2Resource.java | 2 +-
5 files changed, 23 insertions(+), 33 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index def5452..62d12ec 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -160,7 +160,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) throws IOException {
return functions.getFunctionInstanceStatus(
- tenant, namespace, functionName, instanceId);
+ tenant, namespace, functionName, instanceId, uri.getRequestUri());
}
@GET
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 7dc7050..1d315d7 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -110,9 +110,8 @@ public class FunctionsImpl extends BaseResource implements Functions {
}
}
- @Override
- public FunctionStatus getFunctionStatus(String tenant, String namespace, String function, int id)
- throws PulsarAdminException {
+ public FunctionStatus getFunctionStatus(
+ String tenant, String namespace, String function, int id) throws PulsarAdminException {
try {
Response response = request(
functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("status"))
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 93828de..ee6eeec 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -26,6 +26,7 @@ import java.net.URISyntaxException;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
@@ -248,7 +249,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
* @return the function status
*/
public InstanceCommunication.FunctionStatus getFunctionInstanceStatus(String tenant, String namespace,
- String functionName, int instanceId) {
+ String functionName, int instanceId, URI uri) {
Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
final String assignedWorkerId = assignment.getWorkerId();
final String workerId = this.workerConfig.getWorkerId();
@@ -306,23 +307,8 @@ public class FunctionRuntimeManager implements AutoCloseable{
return functionStatusBuilder.build();
}
- Client client = ClientBuilder.newClient();
-
- // TODO: implement authentication/authorization
- String jsonResponse = client.target(String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/status",
- workerInfo.getWorkerHostname(), workerInfo.getPort(), tenant, namespace, functionName, instanceId))
- .request(MediaType.TEXT_PLAIN)
- .get(String.class);
-
- InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
- try {
- org.apache.pulsar.functions.utils.Utils.mergeJson(jsonResponse, functionStatusBuilder);
- } catch (IOException e) {
- log.warn("Got invalid function status response from {}", workerInfo, e);
- throw new RuntimeException(e);
- }
- functionStatusBuilder.setWorkerId(assignedWorkerId);
- functionStatus = functionStatusBuilder.build();
+ URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+ throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}
return functionStatus;
@@ -426,9 +412,10 @@ public class FunctionRuntimeManager implements AutoCloseable{
* @param namespace the namespace the function belongs to
* @param functionName the function name
* @return a list of function statuses
+ * @throws PulsarAdminException
*/
public InstanceCommunication.FunctionStatusList getAllFunctionStatus(
- String tenant, String namespace, String functionName) {
+ String tenant, String namespace, String functionName) throws PulsarAdminException {
Collection<Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName);
@@ -438,13 +425,15 @@ public class FunctionRuntimeManager implements AutoCloseable{
}
for (Assignment assignment : assignments) {
-
- InstanceCommunication.FunctionStatus functionStatus = this.getFunctionInstanceStatus(
- assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
- assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
- assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
- assignment.getInstance().getInstanceId());
-
+ boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
+ InstanceCommunication.FunctionStatus functionStatus = isOwner
+ ? (getFunctionInstanceStatus(tenant, namespace, functionName,
+ assignment.getInstance().getInstanceId(), null))
+ : this.functionAdmin.functions().getFunctionStatus(
+ assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
+ assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
+ assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
+ assignment.getInstance().getInstanceId());
functionStatusListBuilder.addFunctionStatusList(functionStatus);
}
return functionStatusListBuilder.build();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 136bab0..df82c0d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -332,7 +332,7 @@ public class FunctionsImpl {
}
public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String functionName,
- final String instanceId) throws IOException {
+ final String instanceId, URI uri) throws IOException {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
@@ -358,7 +358,9 @@ public class FunctionsImpl {
FunctionStatus functionStatus = null;
try {
functionStatus = functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace, functionName,
- Integer.parseInt(instanceId));
+ Integer.parseInt(instanceId), uri);
+ } catch (WebApplicationException we) {
+ throw we;
} catch (Exception e) {
log.error("{}/{}/{} Got Exception Getting Status", tenant, namespace, functionName, e);
return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 95fe687..6c6fb18 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -104,7 +104,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) throws IOException {
return functions.getFunctionInstanceStatus(
- tenant, namespace, functionName, instanceId);
+ tenant, namespace, functionName, instanceId, uri.getRequestUri());
}
@GET