You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/12/13 14:15:39 UTC
[pulsar] branch master updated: Fix Spellings and Code Cleanup
(#3181)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6df595a Fix Spellings and Code Cleanup (#3181)
6df595a is described below
commit 6df595a341a884404242b8d6dcf0d686e71907ee
Author: Ali Ahmed <al...@gmail.com>
AuthorDate: Thu Dec 13 06:15:34 2018 -0800
Fix Spellings and Code Cleanup (#3181)
* Fix Spellings and Code Cleanup
* Fix Code comments
* Remove Tenant check
* Fix test cases
---
.../pulsar/broker/admin/impl/BrokerStatsBase.java | 2 +-
.../pulsar/broker/admin/impl/BrokersBase.java | 2 +-
.../pulsar/broker/admin/impl/ClustersBase.java | 4 +-
.../broker/admin/impl/ResourceQuotasBase.java | 4 +-
.../apache/pulsar/broker/admin/v2/BrokerStats.java | 2 +-
.../java/org/apache/pulsar/admin/cli/CmdSinks.java | 2 +-
.../org/apache/pulsar/admin/cli/CmdSources.java | 2 +-
.../apache/pulsar/admin/cli/TestCmdSources.java | 4 +-
.../functions/worker/rest/api/ComponentImpl.java | 246 ++++++++++++++-------
.../functions/worker/rest/api/FunctionsImpl.java | 22 +-
.../worker/rest/api/FunctionsMetricsResource.java | 3 +-
.../pulsar/functions/worker/rest/api/SinkImpl.java | 33 ++-
.../functions/worker/rest/api/SourceImpl.java | 28 ++-
.../functions/worker/rest/api/WorkerImpl.java | 25 ++-
14 files changed, 246 insertions(+), 133 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java
index 03fc9a2..43b9ba4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java
@@ -87,7 +87,7 @@ public class BrokerStatsBase extends AdminResource {
@GET
@Path("/destinations")
- @ApiOperation(value = "Get all the topic stats by namesapce", response = OutputStream.class, responseContainer = "OutputStream") // https://github.com/swagger-api/swagger-ui/issues/558
+ @ApiOperation(value = "Get all the topic stats by namespace", response = OutputStream.class, responseContainer = "OutputStream") // https://github.com/swagger-api/swagger-ui/issues/558
// map
// support
// missing
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index c4d504c..900ad7c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -117,7 +117,7 @@ public class BrokersBase extends AdminResource {
@ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"),
@ApiResponse(code = 404, message = "Configuration not found"),
@ApiResponse(code = 412, message = "Configuration can't be updated dynamically") })
- public void updateDynamicConfiguration(@PathParam("configName") String configName, @PathParam("configValue") String configValue) throws Exception{
+ public void updateDynamicConfiguration(@PathParam("configName") String configName, @PathParam("configValue") String configValue) throws Exception {
validateSuperUserAccess();
updateDynamicConfigurationOnZk(configName, configValue);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 3fcb7d6..b65d268 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -344,7 +344,7 @@ public class ClustersBase extends AdminResource {
.get(path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES))
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
- // construct the response to NamespaceisolationData map
+ // construct the response to Namespace isolation data map
return nsIsolationPolicies.getPolicies();
} catch (Exception e) {
log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies", clientAppId(), cluster, e);
@@ -368,7 +368,7 @@ public class ClustersBase extends AdminResource {
.get(path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES))
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
- // construct the response to NamespaceisolationData map
+ // construct the response to Namespace isolation data map
if (!nsIsolationPolicies.getPolicies().containsKey(policyName)) {
log.info("[{}] Cannot find NamespaceIsolationPolicy {} for cluster {}", policyName, cluster);
throw new RestException(Status.NOT_FOUND,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
index 2c664f0..3f502da 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
@@ -94,7 +94,7 @@ public abstract class ResourceQuotasBase extends NamespacesBase {
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to set resource quota for namespace bundle {}: concurrent modification",
clientAppId(), nsBundle.toString());
- throw new RestException(Status.CONFLICT, "Cuncurrent modification on namespace bundle quota");
+ throw new RestException(Status.CONFLICT, "Concurrent modification on namespace bundle quota");
} catch (Exception e) {
log.error("[{}] Failed to set resource quota for namespace bundle {}", clientAppId(), nsBundle.toString());
throw new RestException(e);
@@ -123,7 +123,7 @@ public abstract class ResourceQuotasBase extends NamespacesBase {
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to unset resource quota for namespace bundle {}: concurrent modification",
clientAppId(), nsBundle.toString());
- throw new RestException(Status.CONFLICT, "Cuncurrent modification on namespace bundle quota");
+ throw new RestException(Status.CONFLICT, "Concurrent modification on namespace bundle quota");
} catch (Exception e) {
log.error("[{}] Failed to unset resource quota for namespace bundle {}", clientAppId(),
nsBundle.toString());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java
index 5fd5783..a04fded 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java
@@ -43,7 +43,7 @@ public class BrokerStats extends BrokerStatsBase {
@GET
@Path("/topics")
@ApiOperation(
- value = "Get all the topic stats by namesapce",
+ value = "Get all the topic stats by namespace",
response = OutputStream.class,
responseContainer = "OutputStream")
// https://github.com/swagger-api/swagger-ui/issues/558
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 47ce6e4..5d1b9c0 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -86,7 +86,7 @@ public class CmdSinks extends CmdBase {
jcommander.addCommand("delete", deleteSink);
jcommander.addCommand("list", listSinks);
jcommander.addCommand("get", getSink);
- // TODO depecreate getstatus
+ // TODO deprecate getstatus
jcommander.addCommand("status", getSinkStatus, "getstatus");
jcommander.addCommand("stop", stopSink);
jcommander.addCommand("restart", restartSink);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 9ec950a..7acd85b 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -382,7 +382,7 @@ public class CmdSources extends CmdBase {
protected void validateSourceConfigs(SourceConfig sourceConfig) {
if (isBlank(sourceConfig.getArchive())) {
- throw new ParameterException("Source archive not specfied");
+ throw new ParameterException("Source archive not specified");
}
org.apache.pulsar.common.functions.Utils.inferMissingArguments(sourceConfig);
if (!Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive()) &&
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 2eb951a..bf9c079 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -178,7 +178,7 @@ public class TestCmdSources {
);
}
- @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source archive not specfied")
+ @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source archive not specified")
public void testMissingArchive() throws Exception {
SourceConfig sourceConfig = getSourceConfig();
sourceConfig.setArchive(null);
@@ -356,7 +356,7 @@ public class TestCmdSources {
testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
}
- @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source archive not specfied")
+ @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source archive not specified")
public void testCmdSourceConfigFileMissingJar() throws Exception {
SourceConfig testSourceConfig = getSourceConfig();
testSourceConfig.setArchive(null);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index dd2dce2..fa8f783 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -153,12 +153,16 @@ public abstract class ComponentImpl {
public abstract T notScheduledInstance();
- public abstract T fromFunctionStatusProto(InstanceCommunication.FunctionStatus status, String assignedWorkerId);
+ public abstract T fromFunctionStatusProto(final InstanceCommunication.FunctionStatus status,
+ final String assignedWorkerId);
- public abstract T notRunning(String assignedWorkerId, String error);
+ public abstract T notRunning(final String assignedWorkerId, final String error);
- public T getComponentInstanceStatus(String tenant, String namespace,
- String name, int instanceId, URI uri) {
+ public T getComponentInstanceStatus(final String tenant,
+ final String namespace,
+ final String name,
+ final int instanceId,
+ final URI uri) {
Function.Assignment assignment;
if (worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
@@ -218,16 +222,23 @@ public abstract class ComponentImpl {
}
}
- public abstract S getStatus(String tenant, String namespace,
- String name, Collection<Function.Assignment> assignments, URI uri) throws PulsarAdminException;
+ public abstract S getStatus(final String tenant,
+ final String namespace,
+ final String name,
+ final Collection<Function.Assignment> assignments,
+ final URI uri) throws PulsarAdminException;
- public abstract S getStatusExternal(String tenant, String namespace,
- String name, int parallelism);
+ public abstract S getStatusExternal(final String tenant,
+ final String namespace,
+ final String name,
+ final int parallelism);
- public abstract S emptyStatus(int parallelism);
+ public abstract S emptyStatus(final int parallelism);
- public S getComponentStatus(String tenant, String namespace,
- String name, URI uri) {
+ public S getComponentStatus(final String tenant,
+ final String namespace,
+ final String name,
+ final URI uri) {
Function.FunctionMetaData functionMetaData = worker().getFunctionMetaDataManager().getFunctionMetaData(tenant, namespace, name);
@@ -291,10 +302,14 @@ public abstract class ComponentImpl {
return true;
}
- public Response registerFunction(final String tenant, final String namespace, final String componentName,
- final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail,
- final String functionPkgUrl, final String functionDetailsJson, final String componentConfigJson,
-
+ public Response registerFunction(final String tenant,
+ final String namespace,
+ final String componentName,
+ final InputStream uploadedInputStream,
+ final FormDataContentDisposition fileDetail,
+ final String functionPkgUrl,
+ final String functionDetailsJson,
+ final String componentConfigJson,
final String clientRole) {
if (!isWorkerServiceAvailable()) {
@@ -315,9 +330,11 @@ public abstract class ComponentImpl {
}
try {
- TenantInfo tenantInfo = worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
- String qualifedNamespace = tenant + "/" + namespace;
- if (!worker().getBrokerAdmin().namespaces().getNamespaces(tenant).contains(qualifedNamespace)) {
+ // Check tenant exists
+ final TenantInfo tenantInfo = worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
+
+ String qualifiedNamespace = tenant + "/" + namespace;
+ if (!worker().getBrokerAdmin().namespaces().getNamespaces(tenant).contains(qualifiedNamespace)) {
log.error("{}/{}/{} Namespace {} does not exist", tenant, namespace,
componentName, namespace);
return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
@@ -409,10 +426,10 @@ public abstract class ComponentImpl {
return updateRequest(functionMetaDataBuilder.build());
}
- private PackageLocationMetaData.Builder getFunctionPackageLocation(FunctionDetails functionDetails,
- String functionPkgUrl,
+ private PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionDetails functionDetails,
+ final String functionPkgUrl,
final FormDataContentDisposition fileDetail,
- File uploadedInputStreamAsFile) throws Exception {
+ final File uploadedInputStreamAsFile) throws Exception {
String tenant = functionDetails.getTenant();
String namespace = functionDetails.getNamespace();
String componentName = functionDetails.getName();
@@ -466,9 +483,14 @@ public abstract class ComponentImpl {
}
- public Response updateFunction(final String tenant, final String namespace, final String componentName,
- final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail,
- final String functionPkgUrl, final String functionDetailsJson, final String componentConfigJson,
+ public Response updateFunction(final String tenant,
+ final String namespace,
+ final String componentName,
+ final InputStream uploadedInputStream,
+ final FormDataContentDisposition fileDetail,
+ final String functionPkgUrl,
+ final String functionDetailsJson,
+ final String componentConfigJson,
final String clientRole) {
if (!isWorkerServiceAvailable()) {
@@ -516,7 +538,7 @@ public abstract class ComponentImpl {
FunctionConfig existingFunctionConfig = FunctionConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
existingComponentConfigJson = new Gson().toJson(existingFunctionConfig);
FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class);
- // The rest end points take precendence over whatever is there in functionconfig
+ // The rest end points take precedence over whatever is there in functionconfig
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
functionConfig.setName(componentName);
@@ -531,7 +553,7 @@ public abstract class ComponentImpl {
SourceConfig existingSourceConfig = SourceConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
existingComponentConfigJson = new Gson().toJson(existingSourceConfig);
SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class);
- // The rest end points take precendence over whatever is there in functionconfig
+ // The rest end points take precedence over whatever is there in functionconfig
sourceConfig.setTenant(tenant);
sourceConfig.setNamespace(namespace);
sourceConfig.setName(componentName);
@@ -546,7 +568,7 @@ public abstract class ComponentImpl {
SinkConfig existingSinkConfig = SinkConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
existingComponentConfigJson = new Gson().toJson(existingSinkConfig);
SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class);
- // The rest end points take precendence over whatever is there in functionconfig
+ // The rest end points take precedence over whatever is there in functionconfig
sinkConfig.setTenant(tenant);
sinkConfig.setNamespace(namespace);
sinkConfig.setName(componentName);
@@ -570,7 +592,6 @@ public abstract class ComponentImpl {
if (uploadedInputStream != null) {
uploadedInputStreamAsFile = dumpToTmpFile(uploadedInputStream);
}
- File existingPackageAsFile = null;
// validate parameters
try {
@@ -581,7 +602,8 @@ public abstract class ComponentImpl {
functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, uploadedInputStreamAsFile,
fileDetail, functionDetailsJson, mergedComponentConfigJson, componentType);
} else {
- functionDetails = validateUpdateRequestParamsWithExistingMetadata(tenant, namespace, componentName, existingComponent.getPackageLocation(), mergedComponentConfigJson, componentType);
+ functionDetails = validateUpdateRequestParamsWithExistingMetadata(
+ tenant, namespace, componentName, existingComponent.getPackageLocation(), mergedComponentConfigJson, componentType);
}
} catch (Exception e) {
log.error("Invalid update {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
@@ -618,8 +640,10 @@ public abstract class ComponentImpl {
return updateRequest(functionMetaDataBuilder.build());
}
- public Response deregisterFunction(final String tenant, final String namespace, final String componentName,
- String clientRole) {
+ public Response deregisterFunction(final String tenant,
+ final String namespace,
+ final String componentName,
+ final String clientRole) {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
@@ -702,8 +726,9 @@ public abstract class ComponentImpl {
return Response.status(Status.OK).entity(requestResult.toJson()).build();
}
- public Response getFunctionInfo(final String tenant, final String namespace, final String componentName)
- throws IOException {
+ public Response getFunctionInfo(final String tenant,
+ final String namespace,
+ final String componentName) {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
@@ -731,32 +756,42 @@ public abstract class ComponentImpl {
.entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
}
- String retval;
+ String retVal;
if (componentType.equals(FUNCTION)) {
FunctionConfig config = FunctionConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
- retval = new Gson().toJson(config);
+ retVal = new Gson().toJson(config);
} else if (componentType.equals(SOURCE)) {
SourceConfig config = SourceConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
- retval = new Gson().toJson(config);
+ retVal = new Gson().toJson(config);
} else {
SinkConfig config = SinkConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
- retval = new Gson().toJson(config);
+ retVal = new Gson().toJson(config);
}
- return Response.status(Status.OK).entity(retval).build();
+ return Response.status(Status.OK).entity(retVal).build();
}
- public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName,
- final String instanceId, URI uri) {
+ public Response stopFunctionInstance(final String tenant,
+ final String namespace,
+ final String componentName,
+ final String instanceId,
+ final URI uri) {
return stopFunctionInstance(tenant, namespace, componentName, instanceId, false, uri);
}
- public Response restartFunctionInstance(final String tenant, final String namespace, final String componentName,
- final String instanceId, URI uri) {
+ public Response restartFunctionInstance(final String tenant,
+ final String namespace,
+ final String componentName,
+ final String instanceId,
+ final URI uri) {
return stopFunctionInstance(tenant, namespace, componentName, instanceId, true, uri);
}
- public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName,
- final String instanceId, boolean restart, URI uri) {
+ public Response stopFunctionInstance(final String tenant,
+ final String namespace,
+ final String componentName,
+ final String instanceId,
+ final boolean restart,
+ final URI uri) {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
@@ -797,16 +832,22 @@ public abstract class ComponentImpl {
}
}
- public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName) {
+ public Response stopFunctionInstances(final String tenant,
+ final String namespace,
+ final String componentName) {
return stopFunctionInstances(tenant, namespace, componentName, false);
}
- public Response restartFunctionInstances(final String tenant, final String namespace, final String componentName) {
+ public Response restartFunctionInstances(final String tenant,
+ final String namespace,
+ final String componentName) {
return stopFunctionInstances(tenant, namespace, componentName, true);
}
- public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName,
- boolean restart) {
+ public Response stopFunctionInstances(final String tenant,
+ final String namespace,
+ final String componentName,
+ final boolean restart) {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
@@ -846,8 +887,10 @@ public abstract class ComponentImpl {
}
}
- public FunctionStats getFunctionStats(final String tenant, final String namespace, final String componentName,
- URI uri) throws IOException {
+ public FunctionStats getFunctionStats(final String tenant,
+ final String namespace,
+ final String componentName,
+ final URI uri) {
if (!isWorkerServiceAvailable()) {
throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
}
@@ -887,8 +930,11 @@ public abstract class ComponentImpl {
}
- public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionsInstanceStats(final String tenant, final String namespace, final String componentName,
- String instanceId, URI uri) {
+ public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionsInstanceStats(final String tenant,
+ final String namespace,
+ final String componentName,
+ final String instanceId,
+ final URI uri) {
if (!isWorkerServiceAvailable()) {
throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
}
@@ -997,14 +1043,17 @@ public abstract class ComponentImpl {
return this.worker().getConnectorsManager().getConnectors();
}
- public Response triggerFunction(final String tenant, final String namespace, final String functionName,
- final String input, final InputStream uploadedInputStream, final String topic) {
+ public Response triggerFunction(final String tenant,
+ final String namespace,
+ final String functionName,
+ final String input,
+ final InputStream uploadedInputStream,
+ final String topic) {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
}
- FunctionDetails functionDetails;
// validate parameters
try {
validateTriggerRequestParams(tenant, namespace, functionName, topic, input, uploadedInputStream);
@@ -1098,8 +1147,10 @@ public abstract class ComponentImpl {
}
}
- public Response getFunctionState(final String tenant, final String namespace,
- final String functionName, final String key) {
+ public Response getFunctionState(final String tenant,
+ final String namespace,
+ final String functionName,
+ final String key) {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
}
@@ -1209,7 +1260,7 @@ public abstract class ComponentImpl {
}).build();
}
- private void validateListFunctionRequestParams(String tenant, String namespace) throws IllegalArgumentException {
+ private void validateListFunctionRequestParams(final String tenant, final String namespace) throws IllegalArgumentException {
if (tenant == null) {
throw new IllegalArgumentException("Tenant is not provided");
@@ -1219,8 +1270,11 @@ public abstract class ComponentImpl {
}
}
- protected void validateGetFunctionInstanceRequestParams(String tenant, String namespace, String componentName,
- ComponentType componentType, String instanceId) throws IllegalArgumentException {
+ protected void validateGetFunctionInstanceRequestParams(final String tenant,
+ final String namespace,
+ final String componentName,
+ final ComponentType componentType,
+ final String instanceId) throws IllegalArgumentException {
validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
if (instanceId == null) {
throw new IllegalArgumentException(String.format("%s Instance Id is not provided", componentType));
@@ -1255,10 +1309,14 @@ public abstract class ComponentImpl {
}
}
- private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, String namespace, String componentName,
- String functionPkgUrl, String functionDetailsJson, String componentConfigJson,
- ComponentType componentType)
- throws IllegalArgumentException, IOException, URISyntaxException {
+ private FunctionDetails validateUpdateRequestParamsWithPkgUrl(final String tenant,
+ final String namespace,
+ final String componentName,
+ final String functionPkgUrl,
+ final String functionDetailsJson,
+ final String componentConfigJson,
+ final ComponentType componentType)
+ throws IllegalArgumentException, IOException {
if (!org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionPkgUrl)) {
throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)");
}
@@ -1267,9 +1325,14 @@ public abstract class ComponentImpl {
return functionDetails;
}
- private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String componentName,
- File uploadedInputStreamAsFile, FormDataContentDisposition fileDetail, String functionDetailsJson,
- String componentConfigJson, ComponentType componentType)
+ private FunctionDetails validateUpdateRequestParams(final String tenant,
+ final String namespace,
+ final String componentName,
+ final File uploadedInputStreamAsFile,
+ final FormDataContentDisposition fileDetail,
+ final String functionDetailsJson,
+ final String componentConfigJson,
+ final ComponentType componentType)
throws IllegalArgumentException, IOException {
FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
@@ -1281,9 +1344,12 @@ public abstract class ComponentImpl {
return functionDetails;
}
- private FunctionDetails validateUpdateRequestParamsWithExistingMetadata(String tenant, String namespace, String componentName,
- PackageLocationMetaData packageLocationMetaData,
- String componentConfigJson, ComponentType componentType) throws Exception {
+ private FunctionDetails validateUpdateRequestParamsWithExistingMetadata(final String tenant,
+ final String namespace,
+ final String componentName,
+ final PackageLocationMetaData packageLocationMetaData,
+ final String componentConfigJson,
+ final ComponentType componentType) throws Exception {
File tmpFile = File.createTempFile("functions", null);
tmpFile.deleteOnExit();
Utils.downloadFromBookkeeper(worker().getDlogNamespace(), tmpFile, packageLocationMetaData.getPackagePath());
@@ -1291,7 +1357,7 @@ public abstract class ComponentImpl {
null, componentConfigJson, componentType, null, tmpFile);
}
- private static File dumpToTmpFile(InputStream uploadedInputStream) {
+ private static File dumpToTmpFile(final InputStream uploadedInputStream) {
try {
File tmpFile = File.createTempFile("functions", null);
tmpFile.deleteOnExit();
@@ -1302,7 +1368,10 @@ public abstract class ComponentImpl {
}
}
- private void validateGetFunctionStateParams(String tenant, String namespace, String functionName, String key)
+ private void validateGetFunctionStateParams(final String tenant,
+ final String namespace,
+ final String functionName,
+ final String key)
throws IllegalArgumentException {
if (tenant == null) {
@@ -1355,9 +1424,14 @@ public abstract class ComponentImpl {
return null;
}
- private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String componentName,
- String functionDetailsJson, String componentConfigJson, ComponentType componentType,
- String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException {
+ private FunctionDetails validateUpdateRequestParams(final String tenant,
+ final String namespace,
+ final String componentName,
+ final String functionDetailsJson,
+ final String componentConfigJson,
+ final ComponentType componentType,
+ final String functionPkgUrl,
+ final File uploadedInputStreamAsFile) throws IOException {
if (tenant == null) {
throw new IllegalArgumentException("Tenant is not provided");
}
@@ -1370,7 +1444,7 @@ public abstract class ComponentImpl {
if (componentType.equals(FUNCTION) && !isEmpty(componentConfigJson)) {
FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class);
- // The rest end points take precendence over whatever is there in functionconfig
+ // The rest end points take precedence over whatever is there in functionconfig
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
functionConfig.setName(componentName);
@@ -1381,7 +1455,7 @@ public abstract class ComponentImpl {
if (componentType.equals(SOURCE)) {
Path archivePath = null;
SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class);
- // The rest end points take precendence over whatever is there in sourceconfig
+ // The rest end points take precedence over whatever is there in sourceconfig
sourceConfig.setTenant(tenant);
sourceConfig.setNamespace(namespace);
sourceConfig.setName(componentName);
@@ -1403,7 +1477,7 @@ public abstract class ComponentImpl {
if (componentType.equals(SINK)) {
Path archivePath = null;
SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class);
- // The rest end points take precendence over whatever is there in sinkConfig
+ // The rest end points take precedence over whatever is there in sinkConfig
sinkConfig.setTenant(tenant);
sinkConfig.setNamespace(namespace);
sinkConfig.setName(componentName);
@@ -1562,8 +1636,14 @@ public abstract class ComponentImpl {
return TypeResolver.resolveRawArgument(funClass, loadedClass);
}
- private void validateTriggerRequestParams(String tenant, String namespace, String functionName, String topic,
- String input, InputStream uploadedInputStream) {
+ private void validateTriggerRequestParams(final String tenant,
+ final String namespace,
+ final String functionName,
+ final String topic,
+ final String input,
+ final InputStream uploadedInputStream) {
+ // Note : Checking topic is not required it can be null
+
if (tenant == null) {
throw new IllegalArgumentException("Tenant is not provided");
}
@@ -1660,8 +1740,10 @@ public abstract class ComponentImpl {
}
}
- protected void componentInstanceStatusRequestValidate (final String tenant, final String namespace,
- final String componentName, int instanceId) {
+ protected void componentInstanceStatusRequestValidate (final String tenant,
+ final String namespace,
+ final String componentName,
+ final int instanceId) {
componentStatusRequestValidate(tenant, namespace, componentName);
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 9a6f739..f3e41a1 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -24,13 +24,11 @@ import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.RestException;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
-import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.LinkedList;
@@ -155,7 +153,10 @@ public class FunctionsImpl extends ComponentImpl {
}
@Override
- public FunctionStatus getStatusExternal(String tenant, String namespace, String name, int parallelism) {
+ public FunctionStatus getStatusExternal(final String tenant,
+ final String namespace,
+ final String name,
+ final int parallelism) {
FunctionStatus functionStatus = new FunctionStatus();
for (int i = 0; i < parallelism; ++i) {
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData functionInstanceStatusData
@@ -177,7 +178,7 @@ public class FunctionsImpl extends ComponentImpl {
}
@Override
- public FunctionStatus emptyStatus(int parallelism) {
+ public FunctionStatus emptyStatus(final int parallelism) {
FunctionStatus functionStatus = new FunctionStatus();
functionStatus.setNumInstances(parallelism);
functionStatus.setNumRunning(0);
@@ -209,8 +210,11 @@ public class FunctionsImpl extends ComponentImpl {
* @param instanceId the function instance id
* @return the function status
*/
- public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunctionInstanceStatus(final String tenant, final String namespace, final String componentName,
- final String instanceId, URI uri) throws IOException {
+ public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunctionInstanceStatus(final String tenant,
+ final String namespace,
+ final String componentName,
+ final String instanceId,
+ final URI uri) {
// validate parameters
componentInstanceStatusRequestValidate(tenant, namespace, componentName, Integer.parseInt(instanceId));
@@ -237,8 +241,10 @@ public class FunctionsImpl extends ComponentImpl {
* @return a list of function statuses
* @throws PulsarAdminException
*/
- public FunctionStatus getFunctionStatus(final String tenant, final String namespace, final String componentName,
- URI uri) {
+ public FunctionStatus getFunctionStatus(final String tenant,
+ final String namespace,
+ final String componentName,
+ final URI uri) {
// validate parameters
componentStatusRequestValidate(tenant, namespace, componentName);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
index b94dfd8..9d7b316 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.functions.worker.rest.api;
-import com.fasterxml.jackson.core.JsonProcessingException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
@@ -38,7 +37,7 @@ public class FunctionsMetricsResource extends FunctionApiResource {
@Path("metrics")
@GET
@Produces(MediaType.TEXT_PLAIN)
- public Response getMetrics() throws JsonProcessingException {
+ public Response getMetrics() {
WorkerService workerService = get();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index 6d70fbb..862aaf6 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -24,13 +24,11 @@ import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.RestException;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
-import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.LinkedList;
@@ -123,13 +121,18 @@ public class SinkImpl extends ComponentImpl {
}
@Override
- public SinkStatus getStatus(String tenant, String namespace, String name, Collection<Function.Assignment> assignments, URI uri) throws PulsarAdminException {
+ public SinkStatus getStatus(final String tenant,
+ final String namespace,
+ final String name,
+ final Collection<Function.Assignment> assignments,
+ final URI uri) throws PulsarAdminException {
SinkStatus sinkStatus = new SinkStatus();
for (Function.Assignment assignment : assignments) {
boolean isOwner = worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData;
if (isOwner) {
- sinkInstanceStatusData = getComponentInstanceStatus(tenant, namespace, name, assignment.getInstance().getInstanceId(), null);
+ sinkInstanceStatusData = getComponentInstanceStatus(tenant,
+ namespace, name, assignment.getInstance().getInstanceId(), null);
} else {
sinkInstanceStatusData = worker().getFunctionAdmin().sink().getSinkStatus(
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
@@ -154,7 +157,10 @@ public class SinkImpl extends ComponentImpl {
}
@Override
- public SinkStatus getStatusExternal (String tenant, String namespace, String name, int parallelism) {
+ public SinkStatus getStatusExternal(final String tenant,
+ final String namespace,
+ final String name,
+ final int parallelism) {
SinkStatus sinkStatus = new SinkStatus();
for (int i = 0; i < parallelism; ++i) {
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData
@@ -176,7 +182,7 @@ public class SinkImpl extends ComponentImpl {
}
@Override
- public SinkStatus emptyStatus(int parallelism) {
+ public SinkStatus emptyStatus(final int parallelism) {
SinkStatus sinkStatus = new SinkStatus();
sinkStatus.setNumInstances(parallelism);
sinkStatus.setNumRunning(0);
@@ -200,9 +206,11 @@ public class SinkImpl extends ComponentImpl {
super(workerServiceSupplier, ComponentType.SINK);
}
- public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus(
- String tenant, String namespace, String sinkName, String instanceId, URI uri)
- throws IOException {
+ public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus(final String tenant,
+ final String namespace,
+ final String sinkName,
+ final String instanceId,
+ final URI uri) {
// validate parameters
componentInstanceStatusRequestValidate(tenant, namespace, sinkName, Integer.parseInt(instanceId));
@@ -221,9 +229,10 @@ public class SinkImpl extends ComponentImpl {
return sinkInstanceStatusData;
}
- public SinkStatus getSinkStatus(
- final String tenant, final String namespace,
- final String componentName, URI uri) {
+ public SinkStatus getSinkStatus(final String tenant,
+ final String namespace,
+ final String componentName,
+ final URI uri) {
// validate parameters
componentStatusRequestValidate(tenant, namespace, componentName);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index 8c3c988..d915bb0 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -24,13 +24,11 @@ import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.SourceStatus;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.RestException;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
-import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.LinkedList;
@@ -122,7 +120,11 @@ public class SourceImpl extends ComponentImpl {
}
@Override
- public SourceStatus getStatus(String tenant, String namespace, String name, Collection<Function.Assignment> assignments, URI uri) throws PulsarAdminException {
+ public SourceStatus getStatus(final String tenant,
+ final String namespace,
+ final String name,
+ final Collection<Function.Assignment> assignments,
+ final URI uri) throws PulsarAdminException {
SourceStatus sourceStatus = new SourceStatus();
for (Function.Assignment assignment : assignments) {
boolean isOwner = worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
@@ -153,7 +155,10 @@ public class SourceImpl extends ComponentImpl {
}
@Override
- public SourceStatus getStatusExternal(String tenant, String namespace, String name, int parallelism) {
+ public SourceStatus getStatusExternal(final String tenant,
+ final String namespace,
+ final String name,
+ final int parallelism) {
SourceStatus sinkStatus = new SourceStatus();
for (int i = 0; i < parallelism; ++i) {
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData sourceInstanceStatusData
@@ -175,7 +180,7 @@ public class SourceImpl extends ComponentImpl {
}
@Override
- public SourceStatus emptyStatus(int parallelism) {
+ public SourceStatus emptyStatus(final int parallelism) {
SourceStatus sourceStatus = new SourceStatus();
sourceStatus.setNumInstances(parallelism);
sourceStatus.setNumRunning(0);
@@ -199,8 +204,10 @@ public class SourceImpl extends ComponentImpl {
super(workerServiceSupplier, ComponentType.SOURCE);
}
- public SourceStatus getSourceStatus(final String tenant, final String namespace,
- final String componentName, URI uri) throws IOException {
+ public SourceStatus getSourceStatus(final String tenant,
+ final String namespace,
+ final String componentName,
+ final URI uri) {
// validate parameters
componentStatusRequestValidate(tenant, namespace, componentName);
@@ -217,8 +224,11 @@ public class SourceImpl extends ComponentImpl {
return sourceStatus;
}
- public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceInstanceStatus(
- String tenant, String namespace, String sourceName, String instanceId, URI uri) {
+ public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceInstanceStatus(final String tenant,
+ final String namespace,
+ final String sourceName,
+ final String instanceId,
+ final URI uri) {
// validate parameters
componentInstanceStatusRequestValidate(tenant, namespace, sourceName, Integer.parseInt(instanceId));
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index bafb7a9..56c945d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -27,14 +27,20 @@ import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.worker.*;
+import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
+import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
+import org.apache.pulsar.functions.worker.MembershipManager;
+import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerService;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
-import java.io.*;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.function.Supplier;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -118,11 +124,11 @@ public class WorkerImpl {
.build();
}
- public boolean isSuperUser(String clientRole) {
+ public boolean isSuperUser(final String clientRole) {
return clientRole != null && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
}
- public List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrics(String clientRole) throws IOException {
+ public List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrics(final String clientRole) {
if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) {
log.error("Client [{}] is not admin and authorized to get function-stats", clientRole);
throw new WebApplicationException(Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
@@ -135,12 +141,12 @@ public class WorkerImpl {
if (!isWorkerServiceAvailable()) {
throw new WebApplicationException(
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData("Function worker service is not avaialable")).build());
+ .entity(new ErrorData("Function worker service is not available")).build());
}
return worker().getMetricsGenerator().generate();
}
- public Response getFunctionsMetrics(String clientRole) throws IOException {
+ public Response getFunctionsMetrics(final String clientRole) {
if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) {
log.error("Client [{}] is not admin and authorized to get function-stats", clientRole);
return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
@@ -156,7 +162,7 @@ public class WorkerImpl {
public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData metrics;
}
- private Response getFunctionsMetrics() throws IOException {
+ private Response getFunctionsMetrics() {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
}
@@ -171,7 +177,8 @@ public class WorkerImpl {
String fullyQualifiedInstanceName = entry.getKey();
FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
- FunctionStats.FunctionInstanceStats functionInstanceStats = Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo);
+ FunctionStats.FunctionInstanceStats functionInstanceStats =
+ Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo);
WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats();
workerFunctionInstanceStats.setName(fullyQualifiedInstanceName);