You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/12/14 05:17:39 UTC
[pulsar] branch master updated: Cleaning up and improving worker
endpoints (#3191)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 23a6622 Cleaning up and improving worker endpoints (#3191)
23a6622 is described below
commit 23a6622dd57aac1a02cedb26bdfcc8ef19564886
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu Dec 13 21:17:34 2018 -0800
Cleaning up and improving worker endpoints (#3191)
---
.../org/apache/pulsar/broker/admin/v2/Worker.java | 53 ++++++++-------
.../apache/pulsar/broker/admin/v2/WorkerStats.java | 33 +++++++---
.../org/apache/pulsar/client/admin/Worker.java | 6 +-
.../pulsar/client/admin/internal/WorkerImpl.java | 76 +++++++++++-----------
.../org/apache/pulsar/admin/cli/CliCommand.java | 8 +++
.../apache/pulsar/admin/cli/CmdFunctionWorker.java | 69 +++-----------------
.../policies/data/WorkerFunctionInstanceStats.java | 29 +++++++++
.../functions/worker/rest/api/WorkerImpl.java | 65 +++++++-----------
.../worker/rest/api/v2/WorkerApiV2Resource.java | 69 ++++++++++++--------
.../rest/api/v2/WorkerStatsApiV2Resource.java | 34 +++++++---
10 files changed, 228 insertions(+), 214 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
index 46427cb..a40132d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
@@ -18,24 +18,24 @@
*/
package org.apache.pulsar.broker.admin.v2;
-import java.util.function.Supplier;
-
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
-import org.apache.pulsar.broker.admin.AdminResource;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.worker.WorkerService;
-
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.common.functions.WorkerInfo;
+import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
@Slf4j
@Path("/worker")
public class Worker extends AdminResource implements Supplier<WorkerService> {
@@ -53,42 +53,47 @@ public class Worker extends AdminResource implements Supplier<WorkerService> {
@GET
@ApiOperation(
- value = "Fetches information about the Pulsar cluster running Pulsar Functions"
+ value = "Fetches information about the Pulsar cluster running Pulsar Functions",
+ response = WorkerInfo.class,
+ responseContainer = "List"
)
@ApiResponses(value = {
- @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
-
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 503, message = "Worker service is not running")
})
@Path("/cluster")
@Produces(MediaType.APPLICATION_JSON)
- public Response getCluster() {
+ public List<WorkerInfo> getCluster() {
return worker.getCluster();
}
@GET
@ApiOperation(
- value = "Fetches info about the leader node of the Pulsar cluster running Pulsar Functions")
+ value = "Fetches info about the leader node of the Pulsar cluster running Pulsar Functions",
+ response = WorkerInfo.class
+ )
@ApiResponses(value = {
- @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
-
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 503, message = "Worker service is not running")
})
@Path("/cluster/leader")
@Produces(MediaType.APPLICATION_JSON)
- public Response getClusterLeader() {
+ public WorkerInfo getClusterLeader() {
return worker.getClusterLeader();
}
@GET
@ApiOperation(
value = "Fetches information about which Pulsar Functions are assigned to which Pulsar clusters",
- response = Function.Assignment.class,
- responseContainer = "Map"
+ response = Map.class
)
@ApiResponses(value = {
- @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 503, message = "Worker service is not running")
})
@Path("/assignments")
- public Response getAssignments() {
+ @Produces(MediaType.APPLICATION_JSON)
+ public Map<String, Collection<String>> getAssignments() {
return worker.getAssignments();
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
index a0ec3fe..a5146df 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
@@ -23,15 +23,17 @@ import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.admin.AdminResource;
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
-import javax.ws.rs.core.Response;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.util.Collection;
+import java.util.List;
import java.util.function.Supplier;
@Slf4j
@@ -51,18 +53,33 @@ public class WorkerStats extends AdminResource implements Supplier<WorkerService
@GET
@Path("/metrics")
- @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, responseContainer = "List")
- @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission") })
+ @ApiOperation(
+ value = "Gets the metrics for Monitoring",
+ notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics",
+ response = org.apache.pulsar.common.stats.Metrics.class,
+ responseContainer = "List")
+ @ApiResponses(value = {
+ @ApiResponse(code = 401, message = "Don't have admin permission"),
+ @ApiResponse(code = 503, message = "Worker service is not running")
+ })
+ @Produces(MediaType.APPLICATION_JSON)
public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws Exception {
return worker.getWorkerMetrics(clientAppId());
}
@GET
@Path("/functionsmetrics")
- @ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = Metrics.class)
- @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
- @ApiResponse(code = 503, message = "Worker service is not running") })
- public Response getStats() throws IOException {
+ @ApiOperation(
+ value = "Get metrics for all functions owned by worker",
+ notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics",
+ response = WorkerFunctionInstanceStats.class,
+ responseContainer = "List")
+ @ApiResponses(value = {
+ @ApiResponse(code = 401, message = "Don't have admin permission"),
+ @ApiResponse(code = 503, message = "Worker service is not running")
+ })
+ @Produces(MediaType.APPLICATION_JSON)
+ public List<WorkerFunctionInstanceStats> getStats() throws IOException {
return worker.getFunctionsMetrics(clientAppId());
}
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java
index 37e1bac..b21f326 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java
@@ -22,8 +22,8 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
import org.apache.pulsar.common.functions.WorkerInfo;
+import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;
/**
* Admin interface for worker stats management.
@@ -36,8 +36,8 @@ public interface Worker {
* @return
* @throws PulsarAdminException
*/
- Metrics getFunctionsStats() throws PulsarAdminException;
-
+ List<WorkerFunctionInstanceStats> getFunctionsStats() throws PulsarAdminException;
+
/**
* Get worker metrics.
* @return
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
index c2a9d13..3df818a 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
@@ -18,27 +18,20 @@
*/
package org.apache.pulsar.client.admin.internal;
-import static org.apache.pulsar.client.admin.internal.FunctionsImpl.mergeJson;
-
-import java.lang.reflect.Type;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Worker;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.common.functions.WorkerInfo;
+import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.Response;
-
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.admin.Worker;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.common.functions.WorkerInfo;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
@Slf4j
public class WorkerImpl extends BaseResource implements Worker {
@@ -53,27 +46,28 @@ public class WorkerImpl extends BaseResource implements Worker {
}
@Override
- public Metrics getFunctionsStats() throws PulsarAdminException {
+ public List<WorkerFunctionInstanceStats> getFunctionsStats() throws PulsarAdminException {
try {
Response response = request(workerStats.path("functionsmetrics")).get();
- if (!response.getStatusInfo().equals(Response.Status.OK)) {
- throw new ClientErrorException(response);
- }
- String jsonResponse = response.readEntity(String.class);
- Metrics.Builder metricsBuilder = Metrics.newBuilder();
- mergeJson(jsonResponse, metricsBuilder);
- return metricsBuilder.build();
- } catch (Exception e) {
- throw getApiException(e);
- }
- }
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ throw new ClientErrorException(response);
+ }
+ List<WorkerFunctionInstanceStats> metricsList
+ = response.readEntity(new GenericType<List<WorkerFunctionInstanceStats>>() {});
+ return metricsList;
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
@Override
public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws PulsarAdminException {
try {
- return request(workerStats.path("metrics"))
- .get(new GenericType<List<org.apache.pulsar.common.stats.Metrics>>() {
- });
+ Response response = request(workerStats.path("metrics")).get();
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ throw new ClientErrorException(response);
+ }
+ return response.readEntity(new GenericType<List<org.apache.pulsar.common.stats.Metrics>>() {});
} catch (Exception e) {
throw getApiException(e);
}
@@ -82,9 +76,11 @@ public class WorkerImpl extends BaseResource implements Worker {
@Override
public List<WorkerInfo> getCluster() throws PulsarAdminException {
try {
- return request(worker.path("cluster"))
- .get(new GenericType<List<WorkerInfo>>() {
- });
+ Response response = request(worker.path("cluster")).get();
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ throw new ClientErrorException(response);
+ }
+ return response.readEntity(new GenericType<List<WorkerInfo>>() {});
} catch (Exception e) {
throw getApiException(e);
}
@@ -93,8 +89,11 @@ public class WorkerImpl extends BaseResource implements Worker {
@Override
public WorkerInfo getClusterLeader() throws PulsarAdminException {
try {
- return request(worker.path("cluster").path("leader"))
- .get(new GenericType<WorkerInfo>(){});
+ Response response = request(worker.path("cluster").path("leader")).get();
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ throw new ClientErrorException(response);
+ }
+ return response.readEntity(new GenericType<WorkerInfo>(){});
} catch (Exception e) {
throw getApiException(e);
}
@@ -107,9 +106,8 @@ public class WorkerImpl extends BaseResource implements Worker {
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
- String jsonResponse = response.readEntity(String.class);
- Type type = new TypeToken<Map<String, Collection<String>>>(){}.getType();
- Map<String, Collection<String>> assignments = new Gson().fromJson(jsonResponse, type);
+ Map<String, Collection<String>> assignments
+ = response.readEntity(new GenericType<Map<String, Collection<String>>>() {});
return assignments;
} catch (Exception e) {
throw getApiException(e);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
index 99d27d8..7a3d150 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
@@ -175,6 +175,14 @@ abstract class CliCommand {
}
}
+ <T> void printList(T item) {
+ try {
+ System.out.println(writer.writeValueAsString(item));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
<T> void print(T item) {
try {
System.out.println(writer.writeValueAsString(item));
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java
index 324a028..6a166c7 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java
@@ -18,23 +18,11 @@
*/
package org.apache.pulsar.admin.cli;
-import com.google.protobuf.util.JsonFormat;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.PulsarClientException;
-
-import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonParser;
-
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.common.functions.WorkerInfo;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClientException;
@Slf4j
@Parameters(commandDescription = "Operations to collect function-worker statistics")
@@ -57,88 +45,49 @@ public class CmdFunctionWorker extends CmdBase {
abstract void runCmd() throws Exception;
}
- @Parameters(commandDescription = "dump all functions stats")
+ @Parameters(commandDescription = "Dump all functions stats running on this broker")
class FunctionsStats extends BaseCommand {
- @Parameter(names = { "-i", "--indent" }, description = "Indent JSON output", required = false)
- boolean indent = false;
-
@Override
void runCmd() throws Exception {
- String json = JsonFormat.printer().print(admin.worker().getFunctionsStats());
- GsonBuilder gsonBuilder = new GsonBuilder();
- if (indent) {
- gsonBuilder.setPrettyPrinting();
- }
- System.out.println(gsonBuilder.create().toJson(new JsonParser().parse(json)));
+ printList(admin.worker().getFunctionsStats());
}
}
- @Parameters(commandDescription = "dump metrics for Monitoring")
+ @Parameters(commandDescription = "Dump metrics for Monitoring")
class CmdMonitoringMetrics extends BaseCommand {
- @Parameter(names = { "-i", "--indent" }, description = "Indent JSON output", required = false)
- boolean indent = false;
-
@Override
void runCmd() throws Exception {
- String json = new Gson().toJson(admin.worker().getMetrics());
- GsonBuilder gsonBuilder = new GsonBuilder();
- if (indent) {
- gsonBuilder.setPrettyPrinting();
- }
- System.out.println(gsonBuilder.create().toJson(new JsonParser().parse(json)));
+ printList(admin.worker().getMetrics());
}
}
@Parameters(commandDescription = "Get all workers belonging to this cluster")
class GetCluster extends BaseCommand {
- @Parameter(names = { "-i", "--indent" }, description = "Indent JSON output", required = false)
- boolean indent = false;
-
@Override
void runCmd() throws Exception {
- List<WorkerInfo> workers = admin.worker().getCluster();
- GsonBuilder gsonBuilder = new GsonBuilder();
- if (indent) {
- gsonBuilder.setPrettyPrinting();
- }
- System.out.println(gsonBuilder.create().toJson(workers));
+ printList(admin.worker().getCluster());
}
}
@Parameters(commandDescription = "Get the leader of the worker cluster")
class GetClusterLeader extends BaseCommand {
- @Parameter(names = { "-i", "--indent" }, description = "Indent JSON output", required = false)
- boolean indent = false;
-
@Override
void runCmd() throws Exception {
- WorkerInfo leader = admin.worker().getClusterLeader();
- GsonBuilder gsonBuilder = new GsonBuilder();
- if (indent) {
- gsonBuilder.setPrettyPrinting();
- }
- System.out.println(gsonBuilder.create().toJson(leader));
+ print(admin.worker().getClusterLeader());
}
}
@Parameters(commandDescription = "Get the assignments of the functions accross the worker cluster")
class GetFunctionAssignments extends BaseCommand {
- @Parameter(names = { "-i", "--indent" }, description = "Indent JSON output", required = false)
- boolean indent = false;
@Override
void runCmd() throws Exception {
- Map<String, Collection<String>> assignments = admin.worker().getAssignments();
- GsonBuilder gsonBuilder = new GsonBuilder();
- if (indent) {
- gsonBuilder.setPrettyPrinting();
- }
- System.out.println(gsonBuilder.create().toJson(assignments));
+ print(admin.worker().getAssignments());
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/WorkerFunctionInstanceStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/WorkerFunctionInstanceStats.java
new file mode 100644
index 0000000..86079bb
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/WorkerFunctionInstanceStats.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import lombok.Data;
+import org.apache.pulsar.common.policies.data.FunctionStats;
+
+@Data
+public class WorkerFunctionInstanceStats {
+ /** fully qualified function instance name **/
+ public String name;
+ public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData metrics;
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index 56c945d..6351272 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -18,25 +18,25 @@
*/
package org.apache.pulsar.functions.worker.rest.api;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonArray;
-import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.FunctionStats;
+import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.MembershipManager;
import org.apache.pulsar.functions.worker.Utils;
import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.RestException;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -50,8 +50,6 @@ public class WorkerImpl {
private final Supplier<WorkerService> workerServiceSupplier;
- private final Gson gson = new GsonBuilder().setPrettyPrinting().create();
-
public WorkerImpl(Supplier<WorkerService> workerServiceSupplier) {
this.workerServiceSupplier = workerServiceSupplier;
}
@@ -76,36 +74,33 @@ public class WorkerImpl {
return true;
}
- public Response getCluster() {
+ public List<WorkerInfo> getCluster() {
if (!isWorkerServiceAvailable()) {
- return getUnavailableResponse();
+ throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
}
List<WorkerInfo> workers = worker().getMembershipManager().getCurrentMembership();
- String jsonString = new Gson().toJson(workers);
- return Response.status(Status.OK).type(MediaType.APPLICATION_JSON).entity(jsonString).build();
+ return workers;
}
- public Response getClusterLeader() {
+ public WorkerInfo getClusterLeader() {
if (!isWorkerServiceAvailable()) {
- return getUnavailableResponse();
+ throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
}
MembershipManager membershipManager = worker().getMembershipManager();
WorkerInfo leader = membershipManager.getLeader();
if (leader == null) {
- return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData("Leader cannot be determined")).build();
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, "Leader cannot be determined");
}
- String jsonString = new Gson().toJson(leader);
- return Response.status(Status.OK).type(MediaType.APPLICATION_JSON).entity(jsonString).build();
+ return leader;
}
- public Response getAssignments() {
+ public Map<String, Collection<String>> getAssignments() {
if (!isWorkerServiceAvailable()) {
- return getUnavailableResponse();
+ throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
}
FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
@@ -114,14 +109,7 @@ public class WorkerImpl {
for (Map.Entry<String, Map<String, Function.Assignment>> entry : assignments.entrySet()) {
ret.put(entry.getKey(), entry.getValue().keySet());
}
- return Response.status(Status.OK).type(MediaType.APPLICATION_JSON).entity(new Gson().toJson(ret)).build();
- }
-
- private Response getUnavailableResponse() {
- return Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(
- "Function worker service is not done initializing. " + "Please try again in a little while."))
- .build();
+ return ret;
}
public boolean isSuperUser(final String clientRole) {
@@ -146,32 +134,25 @@ public class WorkerImpl {
return worker().getMetricsGenerator().generate();
}
- public Response getFunctionsMetrics(final String clientRole) {
+ public List<WorkerFunctionInstanceStats> getFunctionsMetrics(String clientRole) throws IOException {
+
if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) {
log.error("Client [{}] is not admin and authorized to get function-stats", clientRole);
- return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData("client is not authorize to perform operation")).build();
+ throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
return getFunctionsMetrics();
}
- @Data
- public static class WorkerFunctionInstanceStats {
- /** fully qualified function instance name **/
- public String name;
- public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData metrics;
- }
-
- private Response getFunctionsMetrics() {
+ private List<WorkerFunctionInstanceStats> getFunctionsMetrics() throws IOException {
if (!isWorkerServiceAvailable()) {
- return getUnavailableResponse();
+ throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
}
WorkerService workerService = worker();
Map<String, FunctionRuntimeInfo> functionRuntimes = workerService.getFunctionRuntimeManager()
.getFunctionRuntimeInfos();
- JsonArray metricsMapList = new JsonArray();
+ List<WorkerFunctionInstanceStats> metricsList = new ArrayList<>(functionRuntimes.size());
for (Map.Entry<String, FunctionRuntimeInfo> entry : functionRuntimes.entrySet()) {
String fullyQualifiedInstanceName = entry.getKey();
@@ -184,10 +165,8 @@ public class WorkerImpl {
workerFunctionInstanceStats.setName(fullyQualifiedInstanceName);
workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics());
- metricsMapList.add(gson.toJsonTree(workerFunctionInstanceStats));
+ metricsList.add(workerFunctionInstanceStats);
}
- String jsonResponse = gson.toJson(metricsMapList);
-
- return Response.status(Status.OK).entity(jsonResponse).build();
+ return metricsList;
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
index 8cf0797..29d3ff0 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
@@ -18,7 +18,15 @@
*/
package org.apache.pulsar.functions.worker.rest.api.v2;
-import java.util.function.Supplier;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.web.AuthenticationFilter;
+import org.apache.pulsar.common.functions.WorkerInfo;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
@@ -28,18 +36,10 @@ import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
-import org.apache.pulsar.broker.web.AuthenticationFilter;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.worker.WorkerService;
-
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
@Slf4j
@Path("/worker")
@@ -76,33 +76,48 @@ public class WorkerApiV2Resource implements Supplier<WorkerService> {
}
@GET
+ @ApiOperation(
+ value = "Fetches information about the Pulsar cluster running Pulsar Functions",
+ response = WorkerInfo.class,
+ responseContainer = "List"
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 503, message = "Worker service is not running")
+ })
@Path("/cluster")
- @ApiOperation(value = "Fetches information about the Pulsar cluster running Pulsar Functions")
- @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission"),
- @ApiResponse(code = 503, message = "WorkerApiV2Resource service is not running") })
@Produces(MediaType.APPLICATION_JSON)
- public Response getCluster() {
+ public List<WorkerInfo> getCluster() {
return worker.getCluster();
}
@GET
+ @ApiOperation(
+ value = "Fetches info about the leader node of the Pulsar cluster running Pulsar Functions",
+ response = WorkerInfo.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 503, message = "Worker service is not running")
+ })
@Path("/cluster/leader")
- @ApiOperation(value = "Fetches info about the leader node of the Pulsar cluster running Pulsar Functions")
- @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission"),
- @ApiResponse(code = 503, message = "WorkerApiV2Resource service is not running") })
@Produces(MediaType.APPLICATION_JSON)
- public Response getClusterLeader() {
+ public WorkerInfo getClusterLeader() {
return worker.getClusterLeader();
}
@GET
+ @ApiOperation(
+ value = "Fetches information about which Pulsar Functions are assigned to which Pulsar clusters",
+ response = Map.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 503, message = "Worker service is not running")
+ })
@Path("/assignments")
- @ApiOperation(value = "Fetches information about which Pulsar Functions are assigned to which Pulsar clusters",
- response = Function.Assignment.class,
- responseContainer = "Map")
- @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission"),
- @ApiResponse(code = 503, message = "WorkerApiV2Resource service is not running") })
- public Response getAssignments() {
+ @Produces(MediaType.APPLICATION_JSON)
+ public Map<String, Collection<String>> getAssignments() {
return worker.getAssignments();
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java
index 022ebd3..bd404d6 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java
@@ -24,7 +24,7 @@ import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.web.AuthenticationFilter;
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
@@ -36,9 +36,8 @@ import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
import java.io.IOException;
-import java.util.Collection;
+import java.util.List;
import java.util.function.Supplier;
@Slf4j
@@ -77,18 +76,33 @@ public class WorkerStatsApiV2Resource implements Supplier<WorkerService> {
@GET
@Path("/metrics")
- @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, responseContainer = "List")
- @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission") })
- public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws Exception {
+ @ApiOperation(
+ value = "Gets the metrics for Monitoring",
+ notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics",
+ response = org.apache.pulsar.common.stats.Metrics.class,
+ responseContainer = "List")
+ @ApiResponses(value = {
+ @ApiResponse(code = 401, message = "Don't have admin permission"),
+ @ApiResponse(code = 503, message = "Worker service is not running")
+ })
+ @Produces(MediaType.APPLICATION_JSON)
+ public List<org.apache.pulsar.common.stats.Metrics> getMetrics() throws Exception {
return worker.getWorkerMetrics(clientAppId());
}
@GET
@Path("/functionsmetrics")
- @ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = WorkerImpl.WorkerFunctionInstanceStats.class, responseContainer = "List")
- @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
- @ApiResponse(code = 503, message = "Worker service is not running") })
- public Response getFunctionsMetrics() throws IOException {
+ @ApiOperation(
+ value = "Get metrics for all functions owned by worker",
+ notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics",
+ response = WorkerFunctionInstanceStats.class,
+ responseContainer = "List")
+ @ApiResponses(value = {
+ @ApiResponse(code = 401, message = "Don't have admin permission"),
+ @ApiResponse(code = 503, message = "Worker service is not running")
+ })
+ @Produces(MediaType.APPLICATION_JSON)
+ public List<WorkerFunctionInstanceStats> getStats() throws IOException {
return worker.getFunctionsMetrics(clientAppId());
}
}