You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/12/08 00:26:34 UTC
[pulsar] branch master updated: [CheckStyle]Make codestyle of
module `pulsar-broker` gradually conform CheckStyle (#8848)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0e89f96 [CheckStyle]Make codestyle of module `pulsar-broker` gradually conform CheckStyle (#8848)
0e89f96 is described below
commit 0e89f967dda9718b5d00d0daa982b14d1d55bffd
Author: Renkai Ge <ga...@gmail.com>
AuthorDate: Tue Dec 8 08:26:05 2020 +0800
[CheckStyle]Make codestyle of module `pulsar-broker` gradually conform CheckStyle (#8848)
* add provided jclouds
Signed-off-by: Renkai <ga...@gmail.com>
* make broker-style gradually conform checkstyle
Signed-off-by: Renkai <ga...@gmail.com>
* make broker-style gradually conform checkstyle
Signed-off-by: Renkai <ga...@gmail.com>
* make broker-style gradually conform checkstyle
Signed-off-by: Renkai <ga...@gmail.com>
* make broker-style gradually conform checkstyle
Signed-off-by: Renkai <ga...@gmail.com>
---
.../src/main/resources/pulsar/suppressions.xml | 1 -
.../pulsar/broker/admin/impl/BrokerStatsBase.java | 32 +-
.../pulsar/broker/admin/impl/BrokersBase.java | 28 +-
.../pulsar/broker/admin/impl/ClustersBase.java | 15 +-
.../pulsar/broker/admin/impl/FunctionsBase.java | 425 ++++++++--------
.../pulsar/broker/admin/impl/NamespacesBase.java | 436 ++++++++++-------
.../broker/admin/impl/PersistentTopicsBase.java | 532 +++++++++++++--------
.../apache/pulsar/broker/admin/impl/SinksBase.java | 320 +++++++------
.../pulsar/broker/admin/impl/SourcesBase.java | 200 ++++----
.../pulsar/broker/admin/impl/TenantsBase.java | 11 +-
10 files changed, 1160 insertions(+), 840 deletions(-)
diff --git a/buildtools/src/main/resources/pulsar/suppressions.xml b/buildtools/src/main/resources/pulsar/suppressions.xml
index 66b3a4e..3294a87 100644
--- a/buildtools/src/main/resources/pulsar/suppressions.xml
+++ b/buildtools/src/main/resources/pulsar/suppressions.xml
@@ -48,7 +48,6 @@
<suppress checks="ConstantName" files="MessageId.java"/>
<suppress checks="MethodName" files="TopicsImpl.java"/>
<suppress checks="MemberName" files="TopicsImpl.java"/>
- <suppress checks=".*" files="src/main/java/org/apache/pulsar/broker/admin/impl/.*.java"/>
<suppress checks=".*" files="src/main/java/org/apache/pulsar/broker/cache/.*.java"/>
<suppress checks=".*" files="src/main/java/org/apache/pulsar/broker/delayed/.*.java"/>
<suppress checks=".*" files="src/main/java/org/apache/pulsar/broker/intercept/.*.java"/>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java
index 28a4934..e92b4dc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java
@@ -52,7 +52,9 @@ public class BrokerStatsBase extends AdminResource {
@GET
@Path("/metrics")
- @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Requested should be executed by Monitoring agent on each broker to fetch the metrics", response = Metrics.class, responseContainer = "List")
+ @ApiOperation(value = "Gets the metrics for Monitoring",
+ notes = "Requested should be executed by Monitoring agent on each broker to fetch the metrics",
+ response = Metrics.class, responseContainer = "List")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public Collection<Metrics> getMetrics() throws Exception {
// Ensure super user access only
@@ -68,7 +70,8 @@ public class BrokerStatsBase extends AdminResource {
@GET
@Path("/mbeans")
- @ApiOperation(value = "Get all the mbean details of this broker JVM", response = Metrics.class, responseContainer = "List")
+ @ApiOperation(value = "Get all the mbean details of this broker JVM",
+ response = Metrics.class, responseContainer = "List")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public Collection<Metrics> getMBeans() throws Exception {
// Ensure super user access only
@@ -84,10 +87,9 @@ public class BrokerStatsBase extends AdminResource {
@GET
@Path("/destinations")
- @ApiOperation(value = "Get all the topic stats by namespace", response = OutputStream.class, responseContainer = "OutputStream") // https://github.com/swagger-api/swagger-ui/issues/558
- // map
- // support
- // missing
+ @ApiOperation(value = "Get all the topic stats by namespace", response = OutputStream.class,
+ responseContainer = "OutputStream") // https://github.com/swagger-api/swagger-ui/issues/558
+ // map support missing
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public StreamingOutput getTopics2() throws Exception {
// Ensure super user access only
@@ -103,7 +105,8 @@ public class BrokerStatsBase extends AdminResource {
@GET
@Path("/allocator-stats/{allocator}")
- @ApiOperation(value = "Get the stats for the Netty allocator. Available allocators are 'default' and 'ml-cache'", response = AllocatorStats.class)
+ @ApiOperation(value = "Get the stats for the Netty allocator. Available allocators are 'default' and 'ml-cache'",
+ response = AllocatorStats.class)
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public AllocatorStats getAllocatorStats(@PathParam("allocator") String allocatorName) throws Exception {
// Ensure super user access only
@@ -121,15 +124,13 @@ public class BrokerStatsBase extends AdminResource {
@GET
@Path("/bookieops")
- @ApiOperation(value = "Get pending bookie client op stats by namesapce", response = PendingBookieOpsStats.class, // https://github.com/swagger-api/swagger-core/issues/449
- // nested
- // containers
- // are
- // not
- // supported
+ @ApiOperation(value = "Get pending bookie client op stats by namesapce",
+ response = PendingBookieOpsStats.class,
+ // https://github.com/swagger-api/swagger-core/issues/449
+ // nested containers are not supported
responseContainer = "Map")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
- public Map<String, Map<String, PendingBookieOpsStats>> getPendingBookieOpsStats() throws Exception {
+ public Map<String, Map<String, PendingBookieOpsStats>> getPendingBookieOpsStats() {
// Ensure super user access only
validateSuperUserAccess();
try {
@@ -142,7 +143,8 @@ public class BrokerStatsBase extends AdminResource {
@GET
@Path("/load-report")
- @ApiOperation(value = "Get Load for this broker", notes = "consists of topics stats & systemResourceUsage", response = LoadReport.class)
+ @ApiOperation(value = "Get Load for this broker", notes = "consists of topics stats & systemResourceUsage",
+ response = LoadReport.class)
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public LoadManagerReport getLoadReport() throws Exception {
// Ensure super user access only
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index e3d9bf3..85549af 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -99,11 +99,12 @@ public class BrokersBase extends AdminResource {
@GET
@Path("/{clusterName}/{broker-webserviceurl}/ownedNamespaces")
- @ApiOperation(value = "Get the list of namespaces served by the specific broker", response = NamespaceOwnershipStatus.class, responseContainer = "Map")
+ @ApiOperation(value = "Get the list of namespaces served by the specific broker",
+ response = NamespaceOwnershipStatus.class, responseContainer = "Map")
@ApiResponses(value = {
- @ApiResponse(code = 307, message = "Current broker doesn't serve the cluster"),
- @ApiResponse(code = 403, message = "Don't have admin permission"),
- @ApiResponse(code = 404, message = "Cluster doesn't exist") })
+ @ApiResponse(code = 307, message = "Current broker doesn't serve the cluster"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Cluster doesn't exist") })
public Map<String, NamespaceOwnershipStatus> getOwnedNamespaces(@PathParam("clusterName") String cluster,
@PathParam("broker-webserviceurl") String broker) throws Exception {
validateSuperUserAccess();
@@ -122,21 +123,24 @@ public class BrokersBase extends AdminResource {
@POST
@Path("/configuration/{configName}/{configValue}")
- @ApiOperation(value = "Update dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
+ @ApiOperation(value =
+ "Update dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
@ApiResponses(value = {
- @ApiResponse(code = 204, message = "Service configuration updated successfully"),
- @ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"),
- @ApiResponse(code = 404, message = "Configuration not found"),
- @ApiResponse(code = 412, message = "Invalid dynamic-config value"),
- @ApiResponse(code = 500, message = "Internal server error")})
- public void updateDynamicConfiguration(@PathParam("configName") String configName, @PathParam("configValue") String configValue) throws Exception {
+ @ApiResponse(code = 204, message = "Service configuration updated successfully"),
+ @ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"),
+ @ApiResponse(code = 404, message = "Configuration not found"),
+ @ApiResponse(code = 412, message = "Invalid dynamic-config value"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ public void updateDynamicConfiguration(@PathParam("configName") String configName,
+ @PathParam("configValue") String configValue) throws Exception {
validateSuperUserAccess();
updateDynamicConfigurationOnZk(configName, configValue);
}
@DELETE
@Path("/configuration/{configName}")
- @ApiOperation(value = "Delete dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
+ @ApiOperation(value =
+ "Delete dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
@ApiResponses(value = { @ApiResponse(code = 204, message = "Service configuration updated successfully"),
@ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"),
@ApiResponse(code = 412, message = "Invalid dynamic-config value"),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 8fcef08..5e9dfbf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -523,7 +523,8 @@ public class ClustersBase extends AdminResource {
"NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
// construct the response to Namespace isolation data map
if (!nsIsolationPolicies.getPolicies().containsKey(policyName)) {
- log.info("[{}] Cannot find NamespaceIsolationPolicy {} for cluster {}", clientAppId(), policyName, cluster);
+ log.info("[{}] Cannot find NamespaceIsolationPolicy {} for cluster {}",
+ clientAppId(), policyName, cluster);
throw new RestException(Status.NOT_FOUND,
"Cannot find NamespaceIsolationPolicy " + policyName + " for cluster " + cluster);
}
@@ -732,7 +733,8 @@ public class ClustersBase extends AdminResource {
}
}
- private boolean createZnodeIfNotExist(String path, Optional<Object> value) throws KeeperException, InterruptedException {
+ private boolean createZnodeIfNotExist(String path, Optional<Object> value)
+ throws KeeperException, InterruptedException {
// create persistent node on ZooKeeper
if (globalZk().exists(path, false) == null) {
// create all the intermediate nodes
@@ -742,7 +744,7 @@ public class ClustersBase extends AdminResource {
CreateMode.PERSISTENT);
return true;
} catch (KeeperException.NodeExistsException nee) {
- if(log.isDebugEnabled()) {
+ if (log.isDebugEnabled()) {
log.debug("Other broker preempted the full path [{}] already. Continue...", path);
}
} catch (JsonGenerationException e) {
@@ -979,7 +981,8 @@ public class ClustersBase extends AdminResource {
validateClusterExists(cluster);
try {
- final String domainPath = joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT, domainName);
+ final String domainPath = joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT,
+ domainName);
globalZk().delete(domainPath, -1);
// clear domain cache
failureDomainCache().invalidate(domainPath);
@@ -1004,7 +1007,9 @@ public class ClustersBase extends AdminResource {
continue;
}
try {
- Optional<FailureDomain> domain = failureDomainCache().get(joinPath(failureDomainRootPath, domainName));
+ Optional<FailureDomain> domain =
+ failureDomainCache()
+ .get(joinPath(failureDomainRootPath, domainName));
if (domain.isPresent() && domain.get().brokers != null) {
List<String> duplicateBrokers = domain.get().brokers.stream().parallel()
.filter(inputDomain.brokers::contains).collect(Collectors.toList());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 86ea652..2d2806b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -86,73 +86,91 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
@ApiParam(
- value = "A JSON value presenting configuration payload of a Pulsar Function. An example of the expected Pulsar Function can be found here. \n" +
- "- **autoAck** \n" +
- " Whether or not the framework acknowledges messages automatically. \n" +
- "- **runtime** \n" +
- " What is the runtime of the Pulsar Function. Possible Values: [JAVA, PYTHON, GO] \n" +
- "- **resources** \n" +
- " The size of the system resources allowed by the Pulsar Function runtime. The resources include: cpu, ram, disk. \n" +
- "- **className** \n" +
- " The class name of a Pulsar Function. \n" +
- "- **customSchemaInputs** \n" +
- " The map of input topics to Schema class names (specified as a JSON object). \n" +
- "- **customSerdeInputs** \n" +
- " The map of input topics to SerDe class names (specified as a JSON object). \n" +
- "- **deadLetterTopic** \n" +
- " Messages that are not processed successfully are sent to `deadLetterTopic`. \n" +
- "- **runtimeFlags** \n" +
- " Any flags that you want to pass to the runtime. Note that in thread mode, these flags have no impact. \n" +
- "- **fqfn** \n" +
- " The Fully Qualified Function Name (FQFN) for the Pulsar Function. \n" +
- "- **inputSpecs** \n" +
- " The map of input topics to its consumer configuration, each configuration has schema of " +
- " {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\", \"isRegexPattern\": true, \"receiverQueueSize\": 5} \n" +
- "- **inputs** \n" +
- " The input topic or topics (multiple topics can be specified as a comma-separated list) of a Pulsar Function. \n" +
- "- **jar** \n" +
- " Path to the JAR file for the Pulsar Function (if the Pulsar Function is written in Java). " +
- " It also supports URL path [http/https/file (file protocol assumes that file " +
- " already exists on worker host)] from which worker can download the package. \n" +
- "- **py** \n" +
- " Path to the main Python file or Python wheel file for the Pulsar Function (if the Pulsar Function is written in Python). \n" +
- "- **go** \n" +
- " Path to the main Go executable binary for the Pulsar Function (if the Pulsar Function is written in Go). \n" +
- "- **logTopic** \n" +
- " The topic to which the logs of a Pulsar Function are produced. \n" +
- "- **maxMessageRetries** \n" +
- " How many times should we try to process a message before giving up. \n" +
- "- **output** \n" +
- " The output topic of a Pulsar Function (If none is specified, no output is written). \n" +
- "- **outputSerdeClassName** \n" +
- " The SerDe class to be used for messages output by the Pulsar Function. \n" +
- "- **parallelism** \n" +
- " The parallelism factor of a Pulsar Function (i.e. the number of a Pulsar Function instances to run). \n" +
- "- **processingGuarantees** \n" +
- " The processing guarantees (that is, delivery semantics) applied to the Pulsar Function." +
- " Possible Values: [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE] \n" +
- "- **retainOrdering** \n" +
- " Function consumes and processes messages in order. \n" +
- "- **outputSchemaType** \n" +
- " Represents either a builtin schema type (for example: 'avro', 'json', ect) or the class name for a Schema implementation." +
- "- **subName** \n" +
- " Pulsar source subscription name. User can specify a subscription-name for the input-topic consumer. \n" +
- "- **windowConfig** \n" +
- " The window configuration of a Pulsar Function. \n" +
- "- **timeoutMs** \n" +
- " The message timeout in milliseconds. \n" +
- "- **topicsPattern** \n" +
- " The topic pattern to consume from a list of topics under a namespace that match the pattern." +
- " [input] and [topic-pattern] are mutually exclusive. Add SerDe class name for a " +
- " pattern in customSerdeInputs (supported for java fun only) \n" +
- "- **userConfig** \n" +
- " A map of user-defined configurations (specified as a JSON object). \n" +
- "- **secrets** \n" +
- " This is a map of secretName(that is how the secret is going to be accessed in the Pulsar Function via context) to an object that" +
- " encapsulates how the secret is fetched by the underlying secrets provider. The type of an value here can be found by the" +
- " SecretProviderConfigurator.getSecretObjectType() method. \n" +
- "- **cleanupSubscription** \n" +
- " Whether the subscriptions of a Pulsar Function created or used should be deleted when the Pulsar Function is deleted. \n",
+ value = "A JSON value presenting configuration payload of a Pulsar Function."
+ + " An example of the expected Pulsar Function can be found here.\n"
+ + "- **autoAck**\n"
+ + " Whether or not the framework acknowledges messages automatically.\n"
+ + "- **runtime**\n"
+ + " What is the runtime of the Pulsar Function. Possible Values: [JAVA, PYTHON, GO]\n"
+ + "- **resources**\n"
+ + " The size of the system resources allowed by the Pulsar Function runtime."
+ + " The resources include: cpu, ram, disk.\n"
+ + "- **className**\n"
+ + " The class name of a Pulsar Function.\n"
+ + "- **customSchemaInputs**\n"
+ + " The map of input topics to Schema class names (specified as a JSON object).\n"
+ + "- **customSerdeInputs**\n"
+ + " The map of input topics to SerDe class names (specified as a JSON object).\n"
+ + "- **deadLetterTopic**\n"
+ + " Messages that are not processed successfully are sent to `deadLetterTopic`.\n"
+ + "- **runtimeFlags**\n"
+ + " Any flags that you want to pass to the runtime."
+ + " Note that in thread mode, these flags have no impact.\n"
+ + "- **fqfn**\n"
+ + " The Fully Qualified Function Name (FQFN) for the Pulsar Function.\n"
+ + "- **inputSpecs**\n"
+ + " The map of input topics to its consumer configuration,"
+ + " each configuration has schema of "
+ + " {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\","
+ + " \"isRegexPattern\": true, \"receiverQueueSize\": 5}\n"
+ + "- **inputs**\n"
+ + " The input topic or topics (multiple topics can be specified as"
+ + " a comma-separated list) of a Pulsar Function.\n"
+ + "- **jar**\n"
+ + " Path to the JAR file for the Pulsar Function"
+ + " (if the Pulsar Function is written in Java). "
+ + " It also supports URL path [http/https/file (file protocol assumes that file "
+ + " already exists on worker host)] from which worker can download the package.\n"
+ + "- **py**\n"
+ + " Path to the main Python file or Python wheel file for the"
+ + " Pulsar Function (if the Pulsar Function is written in Python).\n"
+ + "- **go**\n"
+ + " Path to the main Go executable binary for the Pulsar Function"
+ + " (if the Pulsar Function is written in Go).\n"
+ + "- **logTopic**\n"
+ + " The topic to which the logs of a Pulsar Function are produced.\n"
+ + "- **maxMessageRetries**\n"
+ + " How many times should we try to process a message before giving up.\n"
+ + "- **output**\n"
+ + " The output topic of a Pulsar Function"
+ + " (If none is specified, no output is written).\n"
+ + "- **outputSerdeClassName**\n"
+ + " The SerDe class to be used for messages output by the Pulsar Function.\n"
+ + "- **parallelism**\n"
+ + " The parallelism factor of a Pulsar Function"
+ + " (i.e. the number of a Pulsar Function instances to run).\n"
+ + "- **processingGuarantees**\n"
+ + " The processing guarantees (that is, delivery semantics)"
+ + " applied to the Pulsar Function."
+ + " Possible Values: [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE]\n"
+ + "- **retainOrdering**\n"
+ + " Function consumes and processes messages in order.\n"
+ + "- **outputSchemaType**\n"
+ + " Represents either a builtin schema type (for example: 'avro', 'json', ect)"
+ + " or the class name for a Schema implementation."
+ + "- **subName**\n"
+ + " Pulsar source subscription name. User can specify a subscription-name"
+ + " for the input-topic consumer.\n"
+ + "- **windowConfig**\n"
+ + " The window configuration of a Pulsar Function.\n"
+ + "- **timeoutMs**\n"
+ + " The message timeout in milliseconds.\n"
+ + "- **topicsPattern**\n"
+ + " The topic pattern to consume from a list of topics under a namespace"
+ + " that match the pattern."
+ + " [input] and [topic-pattern] are mutually exclusive. Add SerDe class name for a "
+ + " pattern in customSerdeInputs (supported for java fun only)\n"
+ + "- **userConfig**\n"
+ + " A map of user-defined configurations (specified as a JSON object).\n"
+ + "- **secrets**\n"
+ + " This is a map of secretName(that is how the secret is going to be accessed"
+ + " in the Pulsar Function via context) to an object that"
+ + " encapsulates how the secret is fetched by the underlying secrets provider."
+ + " The type of an value here can be found by the"
+ + " SecretProviderConfigurator.getSecretObjectType() method. \n"
+ + "- **cleanupSubscription**\n"
+ + " Whether the subscriptions of a Pulsar Function created or used should be deleted"
+ + " when the Pulsar Function is deleted.\n",
examples = @Example(
value = @ExampleProperty(
mediaType = MediaType.APPLICATION_JSON,
@@ -193,73 +211,90 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
@ApiParam(
- value = "A JSON value presenting configuration payload of a Pulsar Function. An example of the expected Pulsar Function can be found here. \n" +
- "- **autoAck** \n" +
- " Whether or not the framework acknowledges messages automatically. \n" +
- "- **runtime** \n" +
- " What is the runtime of the Pulsar Function. Possible Values: [JAVA, PYTHON, GO] \n" +
- "- **resources** \n" +
- " The size of the system resources allowed by the Pulsar Function runtime. The resources include: cpu, ram, disk. \n" +
- "- **className** \n" +
- " The class name of a Pulsar Function. \n" +
- "- **customSchemaInputs** \n" +
- " The map of input topics to Schema class names (specified as a JSON object). \n" +
- "- **customSerdeInputs** \n" +
- " The map of input topics to SerDe class names (specified as a JSON object). \n" +
- "- **deadLetterTopic** \n" +
- " Messages that are not processed successfully are sent to `deadLetterTopic`. \n" +
- "- **runtimeFlags** \n" +
- " Any flags that you want to pass to the runtime. Note that in thread mode, these flags have no impact. \n" +
- "- **fqfn** \n" +
- " The Fully Qualified Function Name (FQFN) for the Pulsar Function. \n" +
- "- **inputSpecs** \n" +
- " The map of input topics to its consumer configuration, each configuration has schema of " +
- " {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\", \"isRegexPattern\": true, \"receiverQueueSize\": 5} \n" +
- "- **inputs** \n" +
- " The input topic or topics (multiple topics can be specified as a comma-separated list) of a Pulsar Function. \n" +
- "- **jar** \n" +
- " Path to the JAR file for the Pulsar Function (if the Pulsar Function is written in Java). " +
- " It also supports URL path [http/https/file (file protocol assumes that file " +
- " already exists on worker host)] from which worker can download the package. \n" +
- "- **py** \n" +
- " Path to the main Python file or Python wheel file for the Pulsar Function (if the Pulsar Function is written in Python). \n" +
- "- **go** \n" +
- " Path to the main Go executable binary for the Pulsar Function (if the Pulsar Function is written in Go). \n" +
- "- **logTopic** \n" +
- " The topic to which the logs of a Pulsar Function are produced. \n" +
- "- **maxMessageRetries** \n" +
- " How many times should we try to process a message before giving up. \n" +
- "- **output** \n" +
- " The output topic of a Pulsar Function (If none is specified, no output is written). \n" +
- "- **outputSerdeClassName** \n" +
- " The SerDe class to be used for messages output by the Pulsar Function. \n" +
- "- **parallelism** \n" +
- " The parallelism factor of a Pulsar Function (i.e. the number of a Pulsar Function instances to run). \n" +
- "- **processingGuarantees** \n" +
- " The processing guarantees (that is, delivery semantics) applied to the Pulsar Function." +
- " Possible Values: [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE] \n" +
- "- **retainOrdering** \n" +
- " Function consumes and processes messages in order. \n" +
- "- **outputSchemaType** \n" +
- " Represents either a builtin schema type (for example: 'avro', 'json', ect) or the class name for a Schema implementation." +
- "- **subName** \n" +
- " Pulsar source subscription name. User can specify a subscription-name for the input-topic consumer. \n" +
- "- **windowConfig** \n" +
- " The window configuration of a Pulsar Function. \n" +
- "- **timeoutMs** \n" +
- " The message timeout in milliseconds. \n" +
- "- **topicsPattern** \n" +
- " The topic pattern to consume from a list of topics under a namespace that match the pattern." +
- " [input] and [topic-pattern] are mutually exclusive. Add SerDe class name for a " +
- " pattern in customSerdeInputs (supported for java fun only) \n" +
- "- **userConfig** \n" +
- " A map of user-defined configurations (specified as a JSON object). \n" +
- "- **secrets** \n" +
- " This is a map of secretName(that is how the secret is going to be accessed in the Pulsar Function via context) to an object that" +
- " encapsulates how the secret is fetched by the underlying secrets provider. The type of an value here can be found by the" +
- " SecretProviderConfigurator.getSecretObjectType() method. \n" +
- "- **cleanupSubscription** \n" +
- " Whether the subscriptions of a Pulsar Function created or used should be deleted when the Pulsar Function is deleted. \n",
+ value = "A JSON value presenting configuration payload of a Pulsar Function."
+ + " An example of the expected Pulsar Function can be found here.\n"
+ + "- **autoAck**\n"
+ + " Whether or not the framework acknowledges messages automatically.\n"
+ + "- **runtime**\n"
+ + " What is the runtime of the Pulsar Function. Possible Values: [JAVA, PYTHON, GO]\n"
+ + "- **resources**\n"
+ + " The size of the system resources allowed by the Pulsar Function runtime."
+ + " The resources include: cpu, ram, disk.\n"
+ + "- **className**\n"
+ + " The class name of a Pulsar Function.\n"
+ + "- **customSchemaInputs**\n"
+ + " The map of input topics to Schema class names (specified as a JSON object).\n"
+ + "- **customSerdeInputs**\n"
+ + " The map of input topics to SerDe class names (specified as a JSON object).\n"
+ + "- **deadLetterTopic**\n"
+ + " Messages that are not processed successfully are sent to `deadLetterTopic`.\n"
+ + "- **runtimeFlags**\n"
+ + " Any flags that you want to pass to the runtime."
+ + " Note that in thread mode, these flags have no impact.\n"
+ + "- **fqfn**\n"
+ + " The Fully Qualified Function Name (FQFN) for the Pulsar Function.\n"
+ + "- **inputSpecs**\n"
+ + " The map of input topics to its consumer configuration,"
+ + " each configuration has schema of "
+ + " {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\","
+ + " \"isRegexPattern\": true, \"receiverQueueSize\": 5}\n"
+ + "- **inputs**\n"
+ + " The input topic or topics (multiple topics can be specified as"
+ + " a comma-separated list) of a Pulsar Function.\n"
+ + "- **jar**\n"
+ + " Path to the JAR file for the Pulsar Function"
+ + " (if the Pulsar Function is written in Java). "
+ + " It also supports URL path [http/https/file (file protocol assumes that file "
+ + " already exists on worker host)] from which worker can download the package.\n"
+ + "- **py**\n"
+ + " Path to the main Python file or Python wheel file for the Pulsar Function"
+ + " (if the Pulsar Function is written in Python).\n"
+ + "- **go**\n"
+ + " Path to the main Go executable binary for the Pulsar Function"
+ + " (if the Pulsar Function is written in Go).\n"
+ + "- **logTopic**\n"
+ + " The topic to which the logs of a Pulsar Function are produced.\n"
+ + "- **maxMessageRetries**\n"
+ + " How many times should we try to process a message before giving up.\n"
+ + "- **output**\n"
+ + " The output topic of a Pulsar Function (If none is specified, no output is written).\n"
+ + "- **outputSerdeClassName**\n"
+ + " The SerDe class to be used for messages output by the Pulsar Function.\n"
+ + "- **parallelism**\n"
+ + " The parallelism factor of a Pulsar Function "
+ + "(i.e. the number of a Pulsar Function instances to run).\n"
+ + "- **processingGuarantees**\n"
+ + " The processing guarantees (that is, delivery semantics)"
+ + " applied to the Pulsar Function."
+ + " Possible Values: [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE]\n"
+ + "- **retainOrdering**\n"
+ + " Function consumes and processes messages in order.\n"
+ + "- **outputSchemaType**\n"
+ + " Represents either a builtin schema type (for example: 'avro', 'json', ect)"
+ + " or the class name for a Schema implementation."
+ + "- **subName**\n"
+ + " Pulsar source subscription name. User can specify"
+ + " a subscription-name for the input-topic consumer.\n"
+ + "- **windowConfig**\n"
+ + " The window configuration of a Pulsar Function.\n"
+ + "- **timeoutMs**\n"
+ + " The message timeout in milliseconds.\n"
+ + "- **topicsPattern**\n"
+ + " The topic pattern to consume from a list of topics"
+ + " under a namespace that match the pattern."
+ + " [input] and [topic-pattern] are mutually exclusive. Add SerDe class name for a "
+ + " pattern in customSerdeInputs (supported for java fun only)\n"
+ + "- **userConfig**\n"
+ + " A map of user-defined configurations (specified as a JSON object).\n"
+ + "- **secrets**\n"
+ + " This is a map of secretName(that is how the secret is going to be accessed"
+ + " in the Pulsar Function via context) to an object that"
+ + " encapsulates how the secret is fetched by the underlying secrets provider."
+ + " The type of an value here can be found by the"
+ + " SecretProviderConfigurator.getSecretObjectType() method. \n"
+ + "- **cleanupSubscription**\n"
+ + " Whether the subscriptions of a Pulsar Function created or used"
+ + " should be deleted when the Pulsar Function is deleted.\n",
examples = @Example(
value = @ExampleProperty(
mediaType = MediaType.APPLICATION_JSON,
@@ -339,15 +374,14 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
@Produces(MediaType.APPLICATION_JSON)
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/status")
public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunctionInstanceStatus(
- @ApiParam(value = "The tenant of a Pulsar Function")
- final @PathParam("tenant") String tenant,
- @ApiParam(value = "The namespace of a Pulsar Function")
- final @PathParam("namespace") String namespace,
- @ApiParam(value = "The name of a Pulsar Function")
- final @PathParam("functionName") String functionName,
- @ApiParam(value = "The instanceId of a Pulsar Function (if instance-id is not provided, the stats of all instances is returned")
- final @PathParam("instanceId") String instanceId) throws IOException {
- return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
+ @ApiParam(value = "The tenant of a Pulsar Function") final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The namespace of a Pulsar Function") final @PathParam("namespace") String namespace,
+ @ApiParam(value = "The name of a Pulsar Function") final @PathParam("functionName") String functionName,
+ @ApiParam(value = "The instanceId of a Pulsar Function (if instance-id is not provided,"
+ + " the stats of all instances is returned") final @PathParam("instanceId")
+ String instanceId) throws IOException {
+ return functions.getFunctionInstanceStatus(tenant, namespace, functionName,
+ instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@GET
@@ -370,7 +404,8 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Function")
final @PathParam("functionName") String functionName) throws IOException {
- return functions.getFunctionStatus(tenant, namespace, functionName, uri.getRequestUri(), clientAppId(), clientAuthData());
+ return functions.getFunctionStatus(tenant, namespace, functionName, uri.getRequestUri(),
+ clientAppId(), clientAuthData());
}
@GET
@@ -393,7 +428,8 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Function")
final @PathParam("functionName") String functionName) throws IOException {
- return functions.getFunctionStats(tenant, namespace, functionName, uri.getRequestUri(), clientAppId(), clientAuthData());
+ return functions.getFunctionStats(tenant, namespace, functionName,
+ uri.getRequestUri(), clientAppId(), clientAuthData());
}
@GET
@@ -410,15 +446,14 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
@Produces(MediaType.APPLICATION_JSON)
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats(
- @ApiParam(value = "The tenant of a Pulsar Function")
- final @PathParam("tenant") String tenant,
- @ApiParam(value = "The namespace of a Pulsar Function")
- final @PathParam("namespace") String namespace,
- @ApiParam(value = "The name of a Pulsar Function")
- final @PathParam("functionName") String functionName,
- @ApiParam(value = "The instanceId of a Pulsar Function (if instance-id is not provided, the stats of all instances is returned")
- final @PathParam("instanceId") String instanceId) throws IOException {
- return functions.getFunctionsInstanceStats(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
+ @ApiParam(value = "The tenant of a Pulsar Function") final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The namespace of a Pulsar Function") final @PathParam("namespace") String namespace,
+ @ApiParam(value = "The name of a Pulsar Function") final @PathParam("functionName") String functionName,
+ @ApiParam(value = "The instanceId of a Pulsar Function"
+ + " (if instance-id is not provided, the stats of all instances is returned") final @PathParam(
+ "instanceId") String instanceId) throws IOException {
+ return functions.getFunctionsInstanceStats(tenant, namespace, functionName, instanceId,
+ uri.getRequestUri(), clientAppId(), clientAuthData());
}
@GET
@@ -454,19 +489,19 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
@Path("/{tenant}/{namespace}/{functionName}/trigger")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public String triggerFunction(
- @ApiParam(value = "The tenant of a Pulsar Function")
- final @PathParam("tenant") String tenant,
- @ApiParam(value = "The namespace of a Pulsar Function")
- final @PathParam("namespace") String namespace,
- @ApiParam(value = "The name of a Pulsar Function")
- final @PathParam("functionName") String functionName,
- @ApiParam(value = "The value with which you want to trigger the Pulsar Function")
- final @FormDataParam("data") String triggerValue,
- @ApiParam(value = "The path to the file that contains the data with which you'd like to trigger the Pulsar Function")
- final @FormDataParam("dataStream") InputStream triggerStream,
- @ApiParam(value = "The specific topic name that the Pulsar Function consumes from which you want to inject the data to")
- final @FormDataParam("topic") String topic) {
- return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic, clientAppId(), clientAuthData());
+ @ApiParam(value = "The tenant of a Pulsar Function") final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The namespace of a Pulsar Function") final @PathParam("namespace") String namespace,
+ @ApiParam(value = "The name of a Pulsar Function") final @PathParam("functionName") String functionName,
+ @ApiParam(value = "The value with which you want to trigger the Pulsar Function") final @FormDataParam(
+ "data") String triggerValue,
+ @ApiParam(value = "The path to the file that contains the data with"
+ + " which you'd like to trigger the Pulsar Function") final @FormDataParam("dataStream")
+ InputStream triggerStream,
+ @ApiParam(value = "The specific topic name that the Pulsar Function"
+ + " consumes from which you want to inject the data to") final @FormDataParam("topic")
+ String topic) {
+ return functions.triggerFunction(tenant, namespace, functionName, triggerValue,
+ triggerStream, topic, clientAppId(), clientAuthData());
}
@GET
@@ -524,15 +559,14 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart")
@Consumes(MediaType.APPLICATION_JSON)
public void restartFunction(
- @ApiParam(value = "The tenant of a Pulsar Function")
- final @PathParam("tenant") String tenant,
- @ApiParam(value = "The namespace of a Pulsar Function")
- final @PathParam("namespace") String namespace,
- @ApiParam(value = "The name of a Pulsar Function")
- final @PathParam("functionName") String functionName,
- @ApiParam(value = "The instanceId of a Pulsar Function (if instance-id is not provided, all instances are restarted")
+ @ApiParam(value = "The tenant of a Pulsar Function") final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The namespace of a Pulsar Function") final @PathParam("namespace") String namespace,
+ @ApiParam(value = "The name of a Pulsar Function") final @PathParam("functionName") String functionName,
+ @ApiParam(value =
+ "The instanceId of a Pulsar Function (if instance-id is not provided, all instances are restarted")
final @PathParam("instanceId") String instanceId) {
- functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
+ functions.restartFunctionInstance(tenant, namespace, functionName, instanceId,
+ uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -564,15 +598,14 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stop")
@Consumes(MediaType.APPLICATION_JSON)
public void stopFunction(
- @ApiParam(value = "The tenant of a Pulsar Function")
- final @PathParam("tenant") String tenant,
- @ApiParam(value = "The namespace of a Pulsar Function")
- final @PathParam("namespace") String namespace,
- @ApiParam(value = "The name of a Pulsar Function")
- final @PathParam("functionName") String functionName,
- @ApiParam(value = "The instanceId of a Pulsar Function (if instance-id is not provided, all instances are stopped. ")
+ @ApiParam(value = "The tenant of a Pulsar Function") final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The namespace of a Pulsar Function") final @PathParam("namespace") String namespace,
+ @ApiParam(value = "The name of a Pulsar Function") final @PathParam("functionName") String functionName,
+ @ApiParam(value =
+ "The instanceId of a Pulsar Function (if instance-id is not provided, all instances are stopped. ")
final @PathParam("instanceId") String instanceId) {
- functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
+ functions.stopFunctionInstance(tenant, namespace, functionName, instanceId,
+ uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -604,15 +637,14 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/start")
@Consumes(MediaType.APPLICATION_JSON)
public void startFunction(
- @ApiParam(value = "The tenant of a Pulsar Function")
- final @PathParam("tenant") String tenant,
- @ApiParam(value = "The namespace of a Pulsar Function")
- final @PathParam("namespace") String namespace,
- @ApiParam(value = "The name of a Pulsar Function")
- final @PathParam("functionName") String functionName,
- @ApiParam(value = "The instanceId of a Pulsar Function (if instance-id is not provided, all instances sre started. ")
- final @PathParam("instanceId") String instanceId) {
- functions.startFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
+ @ApiParam(value = "The tenant of a Pulsar Function") final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The namespace of a Pulsar Function") final @PathParam("namespace") String namespace,
+ @ApiParam(value = "The name of a Pulsar Function") final @PathParam("functionName") String functionName,
+ @ApiParam(value = "The instanceId of a Pulsar Function"
+ + " (if instance-id is not provided, all instances sre started. ") final @PathParam("instanceId")
+ String instanceId) {
+ functions.startFunctionInstance(tenant, namespace, functionName, instanceId,
+ uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -704,10 +736,11 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
@Path("/leader/{tenant}/{namespace}/{functionName}")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public void updateFunctionOnWorkerLeader(final @PathParam("tenant") String tenant,
- final @PathParam("namespace") String namespace,
- final @PathParam("functionName") String functionName,
- final @FormDataParam("functionMetaData") InputStream uploadedInputStream,
- final @FormDataParam("delete") boolean delete) {
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName,
+ final @FormDataParam("functionMetaData")
+ InputStream uploadedInputStream,
+ final @FormDataParam("delete") boolean delete) {
functions.updateFunctionOnWorkerLeader(tenant, namespace, functionName, uploadedInputStream,
delete, uri.getRequestUri(), clientAppId());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index a1f4821..3fbe44d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -526,8 +526,9 @@ public abstract class NamespacesBase extends AdminResource {
}
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
.port(replClusterUrl.getPort()).replaceQueryParam("authoritative", false).build();
- if(log.isDebugEnabled()) {
- log.debug("[{}] Redirecting the rest call to {}: cluster={}", clientAppId(), redirect, replCluster);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Redirecting the rest call to {}: cluster={}",
+ clientAppId(), redirect, replCluster);
}
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}
@@ -597,9 +598,11 @@ public abstract class NamespacesBase extends AdminResource {
"The replication cluster does not provide TLS encrypted service");
}
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
- .port(replClusterUrl.getPort()).replaceQueryParam("authoritative", false).build();
- if(log.isDebugEnabled()) {
- log.debug("[{}] Redirecting the rest call to {}: cluster={}", clientAppId(), redirect, replCluster);
+ .port(replClusterUrl.getPort())
+ .replaceQueryParam("authoritative", false).build();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Redirecting the rest call to {}: cluster={}",
+ clientAppId(), redirect, replCluster);
}
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}
@@ -796,7 +799,8 @@ public abstract class NamespacesBase extends AdminResource {
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
- "[{}] Failed to update the replication clusters on namespace {} expected policy node version={} : concurrent modification",
+ "[{}] Failed to update the replication clusters on"
+ + " namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());
throw new RestException(Status.CONFLICT, "Concurrent modification");
@@ -835,7 +839,8 @@ public abstract class NamespacesBase extends AdminResource {
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
- "[{}] Failed to update the message TTL on namespace {} expected policy node version={} : concurrent modification",
+ "[{}] Failed to update the message TTL"
+ + " on namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());
throw new RestException(Status.CONFLICT, "Concurrent modification");
@@ -874,7 +879,8 @@ public abstract class NamespacesBase extends AdminResource {
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
- "[{}] Failed to update the subscription expiration time on namespace {} expected policy node version={} : concurrent modification",
+ "[{}] Failed to update the subscription expiration time on"
+ + " namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
@@ -884,7 +890,8 @@ public abstract class NamespacesBase extends AdminResource {
}
}
- protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, AutoTopicCreationOverride autoTopicCreationOverride) {
+ protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse,
+ AutoTopicCreationOverride autoTopicCreationOverride) {
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
@@ -893,7 +900,8 @@ public abstract class NamespacesBase extends AdminResource {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid configuration for autoTopicCreationOverride");
}
if (maxPartitions > 0 && autoTopicCreationOverride.defaultNumPartitions > maxPartitions) {
- throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be less than or equal to " + maxPartitions);
+ throw new RestException(Status.NOT_ACCEPTABLE,
+ "Number of partitions should be less than or equal to " + maxPartitions);
}
// Force to read the data s.t. the watch to the cache content is setup.
@@ -905,12 +913,16 @@ public abstract class NamespacesBase extends AdminResource {
try {
// Write back the new policies into zookeeper
globalZk().setData(path(POLICIES, namespaceName.toString()),
- jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+ jsonMapper().writeValueAsBytes(
+ policiesNode.getKey()), policiesNode.getValue().getVersion(),
(rc, path1, ctx, stat) -> {
if (rc == KeeperException.Code.OK.intValue()) {
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
- String autoOverride = autoTopicCreationOverride.allowAutoTopicCreation ? "enabled" : "disabled";
- log.info("[{}] Successfully {} autoTopicCreation on namespace {}", clientAppId(), autoOverride, namespaceName);
+ String autoOverride =
+ autoTopicCreationOverride
+ .allowAutoTopicCreation ? "enabled" : "disabled";
+ log.info("[{}] Successfully {} autoTopicCreation on namespace {}",
+ clientAppId(), autoOverride, namespaceName);
asyncResponse.resume(Response.noContent().build());
} else {
String errorMsg = String.format(
@@ -918,28 +930,35 @@ public abstract class NamespacesBase extends AdminResource {
clientAppId(), namespaceName);
if (rc == KeeperException.Code.NONODE.intValue()) {
log.warn("{} : does not exist", errorMsg);
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ "Namespace does not exist"));
} else if (rc == KeeperException.Code.BADVERSION.intValue()) {
log.warn("{} : concurrent modification", errorMsg);
- asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+ asyncResponse.resume(new RestException(Status.CONFLICT,
+ "Concurrent modification"));
} else {
- asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+ asyncResponse.resume(
+ KeeperException.create(
+ KeeperException.Code.get(rc), errorMsg));
}
}
}, null);
return null;
} catch (Exception e) {
- log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
+ log.error("[{}] Failed to modify autoTopicCreation status on namespace {}",
+ clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return null;
}
} else {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ "Namespace " + namespaceName + " does not exist"));
return null;
}
}
).exceptionally(e -> {
- log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
+ log.error("[{}] Failed to modify autoTopicCreation status on namespace {}",
+ clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return null;
});
@@ -958,11 +977,14 @@ public abstract class NamespacesBase extends AdminResource {
try {
// Write back the new policies into zookeeper
globalZk().setData(path(POLICIES, namespaceName.toString()),
- jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+ jsonMapper().writeValueAsBytes(policiesNode.getKey()),
+ policiesNode.getValue().getVersion(),
(rc, path1, ctx, stat) -> {
if (rc == KeeperException.Code.OK.intValue()) {
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
- log.info("[{}] Successfully removed autoTopicCreation override on namespace {}", clientAppId(), namespaceName);
+ log.info("[{}] Successfully removed autoTopicCreation override"
+ + " on namespace {}",
+ clientAppId(), namespaceName);
asyncResponse.resume(Response.noContent().build());
} else {
String errorMsg = String.format(
@@ -970,34 +992,41 @@ public abstract class NamespacesBase extends AdminResource {
clientAppId(), namespaceName);
if (rc == KeeperException.Code.NONODE.intValue()) {
log.warn("{} : does not exist", errorMsg);
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ "Namespace does not exist"));
} else if (rc == KeeperException.Code.BADVERSION.intValue()) {
log.warn("{} : concurrent modification", errorMsg);
- asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+ asyncResponse.resume(new RestException(Status.CONFLICT,
+ "Concurrent modification"));
} else {
- asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+ asyncResponse.resume(KeeperException.create(
+ KeeperException.Code.get(rc), errorMsg));
}
}
}, null);
return null;
} catch (Exception e) {
- log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
+ log.error("[{}] Failed to modify autoTopicCreation status on namespace {}",
+ clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return null;
}
} else {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ "Namespace " + namespaceName + " does not exist"));
return null;
}
}
).exceptionally(e -> {
- log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
+ log.error("[{}] Failed to modify autoTopicCreation status on namespace {}",
+ clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return null;
});
}
- protected void internalSetAutoSubscriptionCreation(AsyncResponse asyncResponse, AutoSubscriptionCreationOverride autoSubscriptionCreationOverride) {
+ protected void internalSetAutoSubscriptionCreation(
+ AsyncResponse asyncResponse, AutoSubscriptionCreationOverride autoSubscriptionCreationOverride) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
@@ -1010,41 +1039,52 @@ public abstract class NamespacesBase extends AdminResource {
try {
// Write back the new policies into zookeeper
globalZk().setData(path(POLICIES, namespaceName.toString()),
- jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+ jsonMapper().writeValueAsBytes(policiesNode.getKey()),
+ policiesNode.getValue().getVersion(),
(rc, path1, ctx, stat) -> {
if (rc == KeeperException.Code.OK.intValue()) {
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
- String autoOverride = autoSubscriptionCreationOverride.allowAutoSubscriptionCreation ? "enabled" : "disabled";
- log.info("[{}] Successfully {} autoSubscriptionCreation on namespace {}", clientAppId(), autoOverride, namespaceName);
+ String autoOverride =
+ autoSubscriptionCreationOverride
+ .allowAutoSubscriptionCreation ? "enabled" : "disabled";
+ log.info("[{}] Successfully {} autoSubscriptionCreation on namespace {}",
+ clientAppId(), autoOverride, namespaceName);
asyncResponse.resume(Response.noContent().build());
} else {
String errorMsg = String.format(
- "[%s] Failed to modify autoSubscriptionCreation status for namespace %s",
+ "[%s] Failed to modify autoSubscriptionCreation"
+ + " status for namespace %s",
clientAppId(), namespaceName);
if (rc == KeeperException.Code.NONODE.intValue()) {
log.warn("{} : does not exist", errorMsg);
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ "Namespace does not exist"));
} else if (rc == KeeperException.Code.BADVERSION.intValue()) {
log.warn("{} : concurrent modification", errorMsg);
- asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+ asyncResponse.resume(new RestException(Status.CONFLICT,
+ "Concurrent modification"));
} else {
- asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+ asyncResponse.resume(KeeperException.create(
+ KeeperException.Code.get(rc), errorMsg));
}
}
}, null);
return null;
} catch (Exception e) {
- log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}", clientAppId(), namespaceName, e);
+ log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}",
+ clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return null;
}
} else {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ "Namespace " + namespaceName + " does not exist"));
return null;
}
}
).exceptionally(e -> {
- log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}", clientAppId(), namespaceName, e);
+ log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}",
+ clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return null;
});
@@ -1063,40 +1103,50 @@ public abstract class NamespacesBase extends AdminResource {
try {
// Write back the new policies into zookeeper
globalZk().setData(path(POLICIES, namespaceName.toString()),
- jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+ jsonMapper().writeValueAsBytes(
+ policiesNode.getKey()), policiesNode.getValue().getVersion(),
(rc, path1, ctx, stat) -> {
if (rc == KeeperException.Code.OK.intValue()) {
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
- log.info("[{}] Successfully removed autoSubscriptionCreation override on namespace {}", clientAppId(), namespaceName);
+ log.info("[{}] Successfully removed autoSubscriptionCreation"
+ + " override on namespace {}",
+ clientAppId(), namespaceName);
asyncResponse.resume(Response.noContent().build());
} else {
String errorMsg = String.format(
- "[%s] Failed to modify autoSubscriptionCreation status for namespace %s",
+ "[%s] Failed to modify autoSubscriptionCreation"
+ + " status for namespace %s",
clientAppId(), namespaceName);
if (rc == KeeperException.Code.NONODE.intValue()) {
log.warn("{} : does not exist", errorMsg);
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ "Namespace does not exist"));
} else if (rc == KeeperException.Code.BADVERSION.intValue()) {
log.warn("{} : concurrent modification", errorMsg);
- asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+ asyncResponse.resume(new RestException(Status.CONFLICT,
+ "Concurrent modification"));
} else {
- asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+ asyncResponse.resume(KeeperException.create(
+ KeeperException.Code.get(rc), errorMsg));
}
}
}, null);
return null;
} catch (Exception e) {
- log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}", clientAppId(), namespaceName, e);
+ log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}",
+ clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return null;
}
} else {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ "Namespace " + namespaceName + " does not exist"));
return null;
}
}
).exceptionally(e -> {
- log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}", clientAppId(), namespaceName, e);
+ log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}",
+ clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return null;
});
@@ -1127,12 +1177,14 @@ public abstract class NamespacesBase extends AdminResource {
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
- "[{}] Failed to modify deduplication status on namespace {} expected policy node version={} : concurrent modification",
+ "[{}] Failed to modify deduplication status on"
+ + " namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
- log.error("[{}] Failed to modify deduplication status on namespace {}", clientAppId(), namespaceName, e);
+ log.error("[{}] Failed to modify deduplication status on namespace {}",
+ clientAppId(), namespaceName, e);
throw new RestException(e);
}
}
@@ -1254,15 +1306,16 @@ public abstract class NamespacesBase extends AdminResource {
String path = joinPath(LOCAL_POLICIES_ROOT, this.namespaceName.toString());
try {
Optional<LocalPolicies> policies = pulsar().getLocalZkCacheService().policiesCache().get(path);
- final BookieAffinityGroupData bookkeeperAffinityGroup = policies.orElseThrow(() -> new RestException(Status.NOT_FOUND,
- "Namespace local-policies does not exist")).bookieAffinityGroup;
+ final BookieAffinityGroupData bookkeeperAffinityGroup =
+ policies.orElseThrow(() -> new RestException(Status.NOT_FOUND,
+ "Namespace local-policies does not exist")).bookieAffinityGroup;
if (bookkeeperAffinityGroup == null) {
throw new RestException(Status.NOT_FOUND, "bookie-affinity group does not exist");
}
return bookkeeperAffinityGroup;
} catch (KeeperException.NoNodeException e) {
- log.warn("[{}] Failed to update local-policy configuration for namespace {}: does not exist", clientAppId(),
- namespaceName);
+ log.warn("[{}] Failed to update local-policy configuration for namespace {}: does not exist",
+ clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace policies does not exist");
} catch (RestException re) {
throw re;
@@ -1281,13 +1334,16 @@ public abstract class NamespacesBase extends AdminResource {
Policies policies = getNamespacePolicies(namespaceName);
- NamespaceBundle bundle = pulsar().getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName.toString(), bundleRange);
+ NamespaceBundle bundle =
+ pulsar().getNamespaceService().getNamespaceBundleFactory()
+ .getBundle(namespaceName.toString(), bundleRange);
boolean isOwnedByLocalCluster = false;
try {
isOwnedByLocalCluster = pulsar().getNamespaceService().isNamespaceBundleOwned(bundle).get();
} catch (Exception e) {
- if(log.isDebugEnabled()) {
- log.debug("Failed to validate cluster ownership for {}-{}, {}", namespaceName.toString(), bundleRange, e.getMessage(), e);
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to validate cluster ownership for {}-{}, {}",
+ namespaceName.toString(), bundleRange, e.getMessage(), e);
}
}
@@ -1328,7 +1384,8 @@ public abstract class NamespacesBase extends AdminResource {
}
@SuppressWarnings("deprecation")
- protected void internalSplitNamespaceBundle(String bundleRange, boolean authoritative, boolean unload, String splitAlgorithmName) {
+ protected void internalSplitNamespaceBundle(String bundleRange,
+ boolean authoritative, boolean unload, String splitAlgorithmName) {
validateSuperUserAccess();
checkNotNull(bundleRange, "BundleRange should not be null");
log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange);
@@ -1347,14 +1404,18 @@ public abstract class NamespacesBase extends AdminResource {
NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
authoritative, true);
- List<String> supportedNamespaceBundleSplitAlgorithms = pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
- if (StringUtils.isNotBlank(splitAlgorithmName) && !supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
+ List<String> supportedNamespaceBundleSplitAlgorithms =
+ pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
+ if (StringUtils.isNotBlank(splitAlgorithmName)
+ && !supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
throw new RestException(Status.PRECONDITION_FAILED,
- "Unsupported namespace bundle split algorithm, supported algorithms are " + supportedNamespaceBundleSplitAlgorithms);
+ "Unsupported namespace bundle split algorithm, supported algorithms are "
+ + supportedNamespaceBundleSplitAlgorithms);
}
try {
- pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload, getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName)).get();
+ pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
+ getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName)).get();
log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString());
} catch (ExecutionException e) {
if (e.getCause() instanceof IllegalArgumentException) {
@@ -1374,7 +1435,8 @@ public abstract class NamespacesBase extends AdminResource {
private NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) {
NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName);
if (algorithm == null) {
- algorithm = NamespaceBundleSplitAlgorithm.of(pulsar().getConfig().getDefaultNamespaceBundleSplitAlgorithm());
+ algorithm = NamespaceBundleSplitAlgorithm.of(
+ pulsar().getConfig().getDefaultNamespaceBundleSplitAlgorithm());
}
if (algorithm == null) {
algorithm = NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO;
@@ -1393,14 +1455,16 @@ public abstract class NamespacesBase extends AdminResource {
// Force to read the data s.t. the watch to the cache content is setup.
policiesNode = policiesCache().getWithStat(path).orElseThrow(
() -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
- policiesNode.getKey().publishMaxMessageRate.put(pulsar().getConfiguration().getClusterName(), maxPublishMessageRate);
+ policiesNode.getKey().publishMaxMessageRate.put(
+ pulsar().getConfiguration().getClusterName(), maxPublishMessageRate);
// Write back the new policies into zookeeper
globalZk().setData(path, jsonMapper().writeValueAsBytes(policiesNode.getKey()),
policiesNode.getValue().getVersion());
policiesCache().invalidate(path);
- log.info("[{}] Successfully updated the publish_max_message_rate for cluster on namespace {}", clientAppId(),
+ log.info("[{}] Successfully updated the publish_max_message_rate for cluster on namespace {}",
+ clientAppId(),
namespaceName);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update the publish_max_message_rate for cluster on namespace {}: does not exist",
@@ -1408,7 +1472,8 @@ public abstract class NamespacesBase extends AdminResource {
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
- "[{}] Failed to update the publish_max_message_rate for cluster on namespace {} expected policy node version={} : concurrent modification",
+ "[{}] Failed to update the publish_max_message_rate for cluster on namespace {}"
+ + " expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());
throw new RestException(Status.CONFLICT, "Concurrent modification");
@@ -1443,7 +1508,8 @@ public abstract class NamespacesBase extends AdminResource {
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
- "[{}] Failed to remove the publish_max_message_rate for cluster on namespace {} expected policy node version={} : concurrent modification",
+ "[{}] Failed to remove the publish_max_message_rate for cluster on"
+ + " namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());
throw new RestException(Status.CONFLICT, "Concurrent modification");
@@ -1495,7 +1561,8 @@ public abstract class NamespacesBase extends AdminResource {
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
- "[{}] Failed to update the dispatchRate for cluster on namespace {} expected policy node version={} : concurrent modification",
+ "[{}] Failed to update the dispatchRate for cluster on"
+ + " namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());
throw new RestException(Status.CONFLICT, "Concurrent modification");
@@ -1533,24 +1600,26 @@ public abstract class NamespacesBase extends AdminResource {
final String path = path(POLICIES, namespaceName.toString());
// Force to read the data s.t. the watch to the cache content is setup.
policiesNode = policiesCache().getWithStat(path).orElseThrow(
- () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
- policiesNode.getKey().subscriptionDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
+ () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+ policiesNode.getKey().subscriptionDispatchRate
+ .put(pulsar().getConfiguration().getClusterName(), dispatchRate);
// Write back the new policies into zookeeper
globalZk().setData(path, jsonMapper().writeValueAsBytes(policiesNode.getKey()),
- policiesNode.getValue().getVersion());
+ policiesNode.getValue().getVersion());
policiesCache().invalidate(path);
- log.info("[{}] Successfully updated the subscriptionDispatchRate for cluster on namespace {}", clientAppId(),
- namespaceName);
+ log.info("[{}] Successfully updated the subscriptionDispatchRate for cluster on namespace {}",
+ clientAppId(), namespaceName);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update the subscriptionDispatchRate for cluster on namespace {}: does not exist",
clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
- "[{}] Failed to update the subscriptionDispatchRate for cluster on namespace {} expected policy node version={} : concurrent modification",
- clientAppId(), namespaceName, policiesNode.getValue().getVersion());
+ "[{}] Failed to update the subscriptionDispatchRate for cluster on"
+ + " namespace {} expected policy node version={} : concurrent modification",
+ clientAppId(), namespaceName, policiesNode.getValue().getVersion());
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
@@ -1564,12 +1633,14 @@ public abstract class NamespacesBase extends AdminResource {
validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
- DispatchRate dispatchRate = policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
+ DispatchRate dispatchRate =
+ policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
if (dispatchRate != null) {
return dispatchRate;
} else {
throw new RestException(Status.NOT_FOUND,
- "Subscription-Dispatch-rate is not configured for cluster " + pulsar().getConfiguration().getClusterName());
+ "Subscription-Dispatch-rate is not configured for cluster "
+ + pulsar().getConfiguration().getClusterName());
}
}
@@ -1585,7 +1656,8 @@ public abstract class NamespacesBase extends AdminResource {
// Force to read the data s.t. the watch to the cache content is setup.
policiesNode = policiesCache().getWithStat(path).orElseThrow(
() -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
- policiesNode.getKey().clusterSubscribeRate.put(pulsar().getConfiguration().getClusterName(), subscribeRate);
+ policiesNode.getKey().clusterSubscribeRate.put(
+ pulsar().getConfiguration().getClusterName(), subscribeRate);
// Write back the new policies into zookeeper
globalZk().setData(path, jsonMapper().writeValueAsBytes(policiesNode.getKey()),
@@ -1600,7 +1672,8 @@ public abstract class NamespacesBase extends AdminResource {
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
- "[{}] Failed to update the subscribeRate for cluster on namespace {} expected policy node version={} : concurrent modification",
+ "[{}] Failed to update the subscribeRate for cluster on"
+ + " namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());
throw new RestException(Status.CONFLICT, "Concurrent modification");
@@ -1633,24 +1706,26 @@ public abstract class NamespacesBase extends AdminResource {
final String path = path(POLICIES, namespaceName.toString());
// Force to read the data s.t. the watch to the cache content is setup.
policiesNode = policiesCache().getWithStat(path).orElseThrow(
- () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
- policiesNode.getKey().replicatorDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
+ () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+ policiesNode.getKey().replicatorDispatchRate.put(
+ pulsar().getConfiguration().getClusterName(), dispatchRate);
// Write back the new policies into zookeeper
globalZk().setData(path, jsonMapper().writeValueAsBytes(policiesNode.getKey()),
- policiesNode.getValue().getVersion());
+ policiesNode.getValue().getVersion());
policiesCache().invalidate(path);
log.info("[{}] Successfully updated the replicatorDispatchRate for cluster on namespace {}", clientAppId(),
- namespaceName);
+ namespaceName);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}: does not exist",
clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
- "[{}] Failed to update the replicatorDispatchRate for cluster on namespace {} expected policy node version={} : concurrent modification",
- clientAppId(), namespaceName, policiesNode.getValue().getVersion());
+ "[{}] Failed to update the replicatorDispatchRate for cluster on"
+ + " namespace {} expected policy node version={} : concurrent modification",
+ clientAppId(), namespaceName, policiesNode.getValue().getVersion());
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
@@ -1669,7 +1744,8 @@ public abstract class NamespacesBase extends AdminResource {
return dispatchRate;
} else {
throw new RestException(Status.NOT_FOUND,
- "replicator-Dispatch-rate is not configured for cluster " + pulsar().getConfiguration().getClusterName());
+ "replicator-Dispatch-rate is not configured for cluster "
+ + pulsar().getConfiguration().getClusterName());
}
}
@@ -1692,10 +1768,12 @@ public abstract class NamespacesBase extends AdminResource {
p.backlog_quota_map.put(backlogQuotaType, backlogQuota);
if (!checkQuotas(p, r)) {
log.warn(
- "[{}] Failed to update backlog configuration for namespace {}: conflicts with retention quota",
+ "[{}] Failed to update backlog configuration"
+ + " for namespace {}: conflicts with retention quota",
clientAppId(), namespaceName);
throw new RestException(Status.PRECONDITION_FAILED,
- "Backlog Quota exceeds configured retention quota for namespace. Please increase retention quota and retry");
+ "Backlog Quota exceeds configured retention quota for namespace."
+ + " Please increase retention quota and retry");
}
}
policies.backlog_quota_map.put(backlogQuotaType, backlogQuota);
@@ -1709,8 +1787,8 @@ public abstract class NamespacesBase extends AdminResource {
namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
- log.warn("[{}] Failed to update backlog quota map for namespace {}: concurrent modification", clientAppId(),
- namespaceName);
+ log.warn("[{}] Failed to update backlog quota map for namespace {}: concurrent modification",
+ clientAppId(), namespaceName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (RestException pfe) {
throw pfe;
@@ -1744,8 +1822,8 @@ public abstract class NamespacesBase extends AdminResource {
namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
- log.warn("[{}] Failed to update backlog quota map for namespace {}: concurrent modification", clientAppId(),
- namespaceName);
+ log.warn("[{}] Failed to update backlog quota map for namespace {}: concurrent modification",
+ clientAppId(), namespaceName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to update backlog quota map for namespace {}", clientAppId(), namespaceName, e);
@@ -1764,7 +1842,8 @@ public abstract class NamespacesBase extends AdminResource {
byte[] content = globalZk().getData(path, null, nodeStat);
Policies policies = jsonMapper().readValue(content, Policies.class);
if (!checkQuotas(policies, retention)) {
- log.warn("[{}] Failed to update retention configuration for namespace {}: conflicts with backlog quota",
+ log.warn("[{}] Failed to update retention configuration"
+ + " for namespace {}: conflicts with backlog quota",
clientAppId(), namespaceName);
throw new RestException(Status.PRECONDITION_FAILED,
"Retention Quota must exceed configured backlog quota for namespace.");
@@ -1776,8 +1855,8 @@ public abstract class NamespacesBase extends AdminResource {
namespaceName, jsonMapper().writeValueAsString(policies.retention_policies));
} catch (KeeperException.NoNodeException e) {
- log.warn("[{}] Failed to update retention configuration for namespace {}: does not exist", clientAppId(),
- namespaceName);
+ log.warn("[{}] Failed to update retention configuration for namespace {}: does not exist",
+ clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn("[{}] Failed to update retention configuration for namespace {}: concurrent modification",
@@ -2092,13 +2171,14 @@ public abstract class NamespacesBase extends AdminResource {
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
- "[{}] Failed to modify encryption required status on namespace {} expected policy node version={} : concurrent modification",
+ "[{}] Failed to modify encryption required status on"
+ + " namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
- log.error("[{}] Failed to modify encryption required status on namespace {}", clientAppId(), namespaceName,
- e);
+ log.error("[{}] Failed to modify encryption required status on namespace {}", clientAppId(),
+ namespaceName, e);
throw new RestException(e);
}
}
@@ -2333,13 +2413,15 @@ public abstract class NamespacesBase extends AdminResource {
subscription = PersistentReplicator.getRemoteCluster(subscription);
}
for (Topic topic : topicList) {
- if (topic instanceof PersistentTopic && !SystemTopicClient.isSystemTopic(TopicName.get(topic.getName()))) {
+ if (topic instanceof PersistentTopic
+ && !SystemTopicClient.isSystemTopic(TopicName.get(topic.getName()))) {
futures.add(((PersistentTopic) topic).clearBacklog(subscription));
}
}
} else {
for (Topic topic : topicList) {
- if (topic instanceof PersistentTopic && !SystemTopicClient.isSystemTopic(TopicName.get(topic.getName()))) {
+ if (topic instanceof PersistentTopic
+ && !SystemTopicClient.isSystemTopic(TopicName.get(topic.getName()))) {
futures.add(((PersistentTopic) topic).clearBacklog());
}
}
@@ -2383,12 +2465,10 @@ public abstract class NamespacesBase extends AdminResource {
}
/**
- * It validates that peer-clusters can't coexist in replication-clusters
+ * It validates that peer-clusters can't coexist in replication-clusters.
*
- * @param clusterName:
- * given cluster whose peer-clusters can't be present into replication-cluster list
- * @param replicationClusters:
- * replication-cluster list
+ * @param clusterName: given cluster whose peer-clusters can't be present into replication-cluster list
+ * @param replicationClusters: replication-cluster list
*/
private void validatePeerClusterConflict(String clusterName, Set<String> replicationClusters) {
try {
@@ -2493,11 +2573,11 @@ public abstract class NamespacesBase extends AdminResource {
"Invalid retention policy: size limit must be >= -1");
checkArgument(retention.getRetentionTimeInMinutes() >= -1,
"Invalid retention policy: time limit must be >= -1");
- checkArgument((retention.getRetentionTimeInMinutes() != 0 && retention.getRetentionSizeInMB() != 0) ||
- (retention.getRetentionTimeInMinutes() == 0 && retention.getRetentionSizeInMB() == 0),
- "Invalid retention policy: Setting a single time or size limit to 0 is invalid when " +
- "one of the limits has a non-zero value. Use the value of -1 instead of 0 to ignore a " +
- "specific limit. To disable retention both limits must be set to 0.");
+ checkArgument((retention.getRetentionTimeInMinutes() != 0 && retention.getRetentionSizeInMB() != 0)
+ || (retention.getRetentionTimeInMinutes() == 0 && retention.getRetentionSizeInMB() == 0),
+ "Invalid retention policy: Setting a single time or size limit to 0 is invalid when "
+ + "one of the limits has a non-zero value. Use the value of -1 instead of 0 to ignore a "
+ + "specific limit. To disable retention both limits must be set to 0.");
}
protected int internalGetMaxProducersPerTopic() {
@@ -2534,22 +2614,23 @@ public abstract class NamespacesBase extends AdminResource {
policies.max_producers_per_topic = maxProducersPerTopic;
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
- log.info("[{}] Successfully updated maxProducersPerTopic configuration: namespace={}, value={}", clientAppId(),
- namespaceName, policies.max_producers_per_topic);
+ log.info("[{}] Successfully updated maxProducersPerTopic configuration: namespace={}, value={}",
+ clientAppId(), namespaceName, policies.max_producers_per_topic);
} catch (KeeperException.NoNodeException e) {
- log.warn("[{}] Failed to update maxProducersPerTopic configuration for namespace {}: does not exist", clientAppId(),
- namespaceName);
+ log.warn("[{}] Failed to update maxProducersPerTopic configuration for namespace {}: does not exist",
+ clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
- log.warn("[{}] Failed to update maxProducersPerTopic configuration for namespace {}: concurrent modification",
+ log.warn("[{}] Failed to update maxProducersPerTopic configuration for"
+ + " namespace {}: concurrent modification",
clientAppId(), namespaceName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
- log.error("[{}] Failed to update maxProducersPerTopic configuration for namespace {}", clientAppId(), namespaceName,
- e);
+ log.error("[{}] Failed to update maxProducersPerTopic configuration for namespace {}",
+ clientAppId(), namespaceName, e);
throw new RestException(e);
}
}
@@ -2575,22 +2656,23 @@ public abstract class NamespacesBase extends AdminResource {
policies.max_consumers_per_topic = maxConsumersPerTopic;
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
- log.info("[{}] Successfully updated maxConsumersPerTopic configuration: namespace={}, value={}", clientAppId(),
- namespaceName, policies.max_consumers_per_topic);
+ log.info("[{}] Successfully updated maxConsumersPerTopic configuration: namespace={}, value={}",
+ clientAppId(), namespaceName, policies.max_consumers_per_topic);
} catch (KeeperException.NoNodeException e) {
- log.warn("[{}] Failed to update maxConsumersPerTopic configuration for namespace {}: does not exist", clientAppId(),
- namespaceName);
+ log.warn("[{}] Failed to update maxConsumersPerTopic configuration for namespace {}: does not exist",
+ clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
- log.warn("[{}] Failed to update maxConsumersPerTopic configuration for namespace {}: concurrent modification",
+ log.warn("[{}] Failed to update maxConsumersPerTopic configuration for"
+ + " namespace {}: concurrent modification",
clientAppId(), namespaceName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
- log.error("[{}] Failed to update maxConsumersPerTopic configuration for namespace {}", clientAppId(), namespaceName,
- e);
+ log.error("[{}] Failed to update maxConsumersPerTopic configuration for namespace {}",
+ clientAppId(), namespaceName, e);
throw new RestException(e);
}
}
@@ -2616,22 +2698,23 @@ public abstract class NamespacesBase extends AdminResource {
policies.max_consumers_per_subscription = maxConsumersPerSubscription;
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
- log.info("[{}] Successfully updated maxConsumersPerSubscription configuration: namespace={}, value={}", clientAppId(),
- namespaceName, policies.max_consumers_per_subscription);
+ log.info("[{}] Successfully updated maxConsumersPerSubscription configuration: namespace={}, value={}",
+ clientAppId(), namespaceName, policies.max_consumers_per_subscription);
} catch (KeeperException.NoNodeException e) {
- log.warn("[{}] Failed to update maxConsumersPerSubscription configuration for namespace {}: does not exist", clientAppId(),
- namespaceName);
+ log.warn("[{}] Failed to update maxConsumersPerSubscription configuration for namespace {}: does not exist",
+ clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
- log.warn("[{}] Failed to update maxConsumersPerSubscription configuration for namespace {}: concurrent modification",
+ log.warn("[{}] Failed to update maxConsumersPerSubscription configuration for"
+ + " namespace {}: concurrent modification",
clientAppId(), namespaceName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
- log.error("[{}] Failed to update maxConsumersPerSubscription configuration for namespace {}", clientAppId(), namespaceName,
- e);
+ log.error("[{}] Failed to update maxConsumersPerSubscription configuration for namespace {}",
+ clientAppId(), namespaceName, e);
throw new RestException(e);
}
}
@@ -2657,22 +2740,24 @@ public abstract class NamespacesBase extends AdminResource {
policies.max_unacked_messages_per_consumer = maxUnackedMessagesPerConsumer;
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
- log.info("[{}] Successfully updated maxUnackedMessagesPerConsumer configuration: namespace={}, value={}", clientAppId(),
- namespaceName, policies.max_unacked_messages_per_consumer);
+ log.info("[{}] Successfully updated maxUnackedMessagesPerConsumer configuration: namespace={}, value={}",
+ clientAppId(), namespaceName, policies.max_unacked_messages_per_consumer);
} catch (KeeperException.NoNodeException e) {
- log.warn("[{}] Failed to update maxUnackedMessagesPerConsumer configuration for namespace {}: does not exist", clientAppId(),
- namespaceName);
+ log.warn("[{}] Failed to update maxUnackedMessagesPerConsumer configuration"
+ + " for namespace {}: does not exist",
+ clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
- log.warn("[{}] Failed to update maxUnackedMessagesPerConsumer configuration for namespace {}: concurrent modification",
+ log.warn("[{}] Failed to update maxUnackedMessagesPerConsumer configuration"
+ + " for namespace {}: concurrent modification",
clientAppId(), namespaceName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
- log.error("[{}] Failed to update maxUnackedMessagesPerConsumer configuration for namespace {}", clientAppId(), namespaceName,
- e);
+ log.error("[{}] Failed to update maxUnackedMessagesPerConsumer configuration for namespace {}",
+ clientAppId(), namespaceName, e);
throw new RestException(e);
}
}
@@ -2698,22 +2783,25 @@ public abstract class NamespacesBase extends AdminResource {
policies.max_unacked_messages_per_subscription = maxUnackedMessagesPerSubscription;
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
- log.info("[{}] Successfully updated maxUnackedMessagesPerSubscription configuration: namespace={}, value={}", clientAppId(),
- namespaceName, policies.max_unacked_messages_per_subscription);
+ log.info("[{}] Successfully updated maxUnackedMessagesPerSubscription"
+ + " configuration: namespace={}, value={}",
+ clientAppId(), namespaceName, policies.max_unacked_messages_per_subscription);
} catch (KeeperException.NoNodeException e) {
- log.warn("[{}] Failed to update maxUnackedMessagesPerSubscription configuration for namespace {}: does not exist", clientAppId(),
- namespaceName);
+ log.warn("[{}] Failed to update maxUnackedMessagesPerSubscription configuration for"
+ + " namespace {}: does not exist",
+ clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
- log.warn("[{}] Failed to update maxUnackedMessagesPerSubscription configuration for namespace {}: concurrent modification",
+ log.warn("[{}] Failed to update maxUnackedMessagesPerSubscription configuration for"
+ + " namespace {}: concurrent modification",
clientAppId(), namespaceName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
- log.error("[{}] Failed to update maxUnackedMessagesPerSubscription configuration for namespace {}", clientAppId(), namespaceName,
- e);
+ log.error("[{}] Failed to update maxUnackedMessagesPerSubscription configuration for namespace {}",
+ clientAppId(), namespaceName, e);
throw new RestException(e);
}
}
@@ -2747,8 +2835,9 @@ public abstract class NamespacesBase extends AdminResource {
clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
- log.warn("[{}] Failed to update compactionThreshold configuration for namespace {}: concurrent modification",
- clientAppId(), namespaceName);
+ log.warn("[{}] Failed to update compactionThreshold configuration for"
+ + " namespace {}: concurrent modification",
+ clientAppId(), namespaceName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (RestException pfe) {
throw pfe;
@@ -2843,8 +2932,9 @@ public abstract class NamespacesBase extends AdminResource {
clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
- log.warn("[{}] Failed to update offloadDeletionLagMs configuration for namespace {}: concurrent modification",
- clientAppId(), namespaceName);
+ log.warn("[{}] Failed to update offloadDeletionLagMs configuration for"
+ + " namespace {}: concurrent modification",
+ clientAppId(), namespaceName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (RestException pfe) {
throw pfe;
@@ -2857,15 +2947,17 @@ public abstract class NamespacesBase extends AdminResource {
@Deprecated
protected SchemaAutoUpdateCompatibilityStrategy internalGetSchemaAutoUpdateCompatibilityStrategy() {
- validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
+ PolicyOperation.READ);
return getNamespacePolicies(namespaceName).schema_auto_update_compatibility_strategy;
}
protected SchemaCompatibilityStrategy internalGetSchemaCompatibilityStrategy() {
- validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
+ PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
- if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED){
+ if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy
.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
}
@@ -2874,18 +2966,20 @@ public abstract class NamespacesBase extends AdminResource {
@Deprecated
protected void internalSetSchemaAutoUpdateCompatibilityStrategy(SchemaAutoUpdateCompatibilityStrategy strategy) {
- validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
+ PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
mutatePolicy((policies) -> {
- policies.schema_auto_update_compatibility_strategy = strategy;
- return policies;
- }, (policies) -> policies.schema_auto_update_compatibility_strategy,
- "schemaAutoUpdateCompatibilityStrategy");
+ policies.schema_auto_update_compatibility_strategy = strategy;
+ return policies;
+ }, (policies) -> policies.schema_auto_update_compatibility_strategy,
+ "schemaAutoUpdateCompatibilityStrategy");
}
protected void internalSetSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) {
- validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
+ PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
mutatePolicy((policies) -> {
@@ -2896,28 +2990,32 @@ public abstract class NamespacesBase extends AdminResource {
}
protected boolean internalGetSchemaValidationEnforced() {
- validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
+ PolicyOperation.READ);
return getNamespacePolicies(namespaceName).schema_validation_enforced;
}
protected void internalSetSchemaValidationEnforced(boolean schemaValidationEnforced) {
- validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
+ PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
mutatePolicy((policies) -> {
- policies. schema_validation_enforced = schemaValidationEnforced;
- return policies;
- }, (policies) -> policies. schema_validation_enforced,
- "schemaValidationEnforced");
+ policies.schema_validation_enforced = schemaValidationEnforced;
+ return policies;
+ }, (policies) -> policies.schema_validation_enforced,
+ "schemaValidationEnforced");
}
protected boolean internalGetIsAllowAutoUpdateSchema() {
- validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
+ PolicyOperation.READ);
return getNamespacePolicies(namespaceName).is_allow_auto_update_schema;
}
protected void internalSetIsAllowAutoUpdateSchema(boolean isAllowAutoUpdateSchema) {
- validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
+ PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
mutatePolicy((policies) -> {
@@ -2988,8 +3086,8 @@ public abstract class NamespacesBase extends AdminResource {
(rc, path1, ctx, stat) -> {
if (rc == KeeperException.Code.OK.intValue()) {
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
- log.info("[{}] Successfully updated offload configuration: namespace={}, map={}", clientAppId(),
- namespaceName, updatedOffloadPolicies);
+ log.info("[{}] Successfully updated offload configuration: namespace={}, map={}",
+ clientAppId(), namespaceName, updatedOffloadPolicies);
asyncResponse.resume(Response.noContent().build());
} else {
String errorMsg = String.format(
@@ -3061,8 +3159,8 @@ public abstract class NamespacesBase extends AdminResource {
"The offloadPolicies must be specified for namespace offload.");
}
if (!offloadPolicies.driverSupported()) {
- log.warn("[{}] Failed to update offload configuration for namespace {}: " +
- "driver is not supported, support value: {}",
+ log.warn("[{}] Failed to update offload configuration for namespace {}: "
+ + "driver is not supported, support value: {}",
clientAppId(), namespaceName, OffloadPolicies.getSupportedDriverNames());
throw new RestException(Status.PRECONDITION_FAILED,
"The driver is not supported, support value: " + OffloadPolicies.getSupportedDriverNames());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index c0cb9ef..81f35cb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -482,7 +482,8 @@ public class PersistentTopicsBase extends AdminResource {
*
* @param numPartitions
*/
- protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateLocalTopicOnly, boolean authoritative) {
+ protected void internalUpdatePartitionedTopic(int numPartitions,
+ boolean updateLocalTopicOnly, boolean authoritative) {
validateWriteOperationOnTopic(authoritative);
// Only do the validation if it's the first hop.
if (!updateLocalTopicOnly) {
@@ -490,7 +491,8 @@ public class PersistentTopicsBase extends AdminResource {
}
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
if (maxPartitions > 0 && numPartitions > maxPartitions) {
- throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be less than or equal to " + maxPartitions);
+ throw new RestException(Status.NOT_ACCEPTABLE,
+ "Number of partitions should be less than or equal to " + maxPartitions);
}
if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) {
@@ -574,13 +576,15 @@ public class PersistentTopicsBase extends AdminResource {
});
}
}).exceptionally(e -> {
- log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
+ log.error("[{}] Failed to create partitions for topic {}",
+ clientAppId(), topicName);
resumeAsyncResponseExceptionally(asyncResponse, e);
return null;
});
}
- protected void internalSetDelayedDeliveryPolicies(AsyncResponse asyncResponse, DelayedDeliveryPolicies deliveryPolicies) {
+ protected void internalSetDelayedDeliveryPolicies(AsyncResponse asyncResponse,
+ DelayedDeliveryPolicies deliveryPolicies) {
TopicPolicies topicPolicies = null;
try {
topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
@@ -593,7 +597,8 @@ public class PersistentTopicsBase extends AdminResource {
topicPolicies = new TopicPolicies();
}
topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive());
- topicPolicies.setDelayedDeliveryTickTimeMillis(deliveryPolicies == null ? null : deliveryPolicies.getTickTime());
+ topicPolicies.setDelayedDeliveryTickTimeMillis(
+ deliveryPolicies == null ? null : deliveryPolicies.getTickTime());
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
.whenComplete((result, ex) -> {
if (ex != null) {
@@ -606,19 +611,22 @@ public class PersistentTopicsBase extends AdminResource {
}
private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions, Set<String> clusters) {
- List<CompletableFuture<Void>> results = new ArrayList<>(clusters.size() -1);
+ List<CompletableFuture<Void>> results = new ArrayList<>(clusters.size() - 1);
clusters.forEach(cluster -> {
if (cluster.equals(pulsar().getConfig().getClusterName())) {
return;
}
results.add(pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics()
- .updatePartitionedTopicAsync(topicName.toString(), numPartitions, true));
+ .updatePartitionedTopicAsync(topicName.toString(),
+ numPartitions, true));
});
return FutureUtil.waitForAll(results);
}
- protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean authoritative, boolean checkAllowAutoCreation) {
- PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName, authoritative, checkAllowAutoCreation);
+ protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean authoritative,
+ boolean checkAllowAutoCreation) {
+ PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName,
+ authoritative, checkAllowAutoCreation);
if (metadata.partitions > 1) {
validateClientVersion();
}
@@ -661,13 +669,16 @@ public class PersistentTopicsBase extends AdminResource {
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
- pulsar().getAdminClient().topics().deleteAsync(topicNamePartition.toString(), force)
+ pulsar().getAdminClient().topics()
+ .deleteAsync(topicNamePartition.toString(), force)
.whenComplete((r, ex) -> {
if (ex != null) {
if (ex instanceof NotFoundException) {
// if the sub-topic is not found, the client might not have called create
- // producer or it might have been deleted earlier, so we ignore the 404 error.
- // For all other exception, we fail the delete partition method even if a single
+ // producer or it might have been deleted earlier,
+ //so we ignore the 404 error.
+ // For all other exception,
+ //we fail the delete partition method even if a single
// partition is failed to be deleted
if (log.isDebugEnabled()) {
log.debug("[{}] Partition not found: {}", clientAppId(),
@@ -730,8 +741,10 @@ public class PersistentTopicsBase extends AdminResource {
log.info("[{}] Deleted partitioned topic {}", clientAppId(), topicName);
asyncResponse.resume(Response.noContent().build());
} else {
- log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc2)));
- asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc2))));
+ log.error("[{}] Failed to delete partitioned topic {}", clientAppId(),
+ topicName, KeeperException.create(KeeperException.Code.get(rc2)));
+ asyncResponse.resume(new RestException(
+ KeeperException.create(KeeperException.Code.get(rc2))));
}
}, null);
} catch (Exception e) {
@@ -745,7 +758,8 @@ public class PersistentTopicsBase extends AdminResource {
topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
} else {
- log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc)));
+ log.error("[{}] Failed to delete partitioned topic {}", clientAppId(),
+ topicName, KeeperException.create(KeeperException.Code.get(rc)));
asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc))));
}
}, null);
@@ -857,7 +871,8 @@ public class PersistentTopicsBase extends AdminResource {
topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.error("Topic {} policies cache have not init.", topicName);
- return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init"));
+ return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
+ "Policies cache have not init"));
}
if (topicPolicies == null) {
topicPolicies = new TopicPolicies();
@@ -866,30 +881,33 @@ public class PersistentTopicsBase extends AdminResource {
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}
- private CompletableFuture<Void> internalUpdateOffloadPolicies(OffloadPolicies offloadPolicies, TopicName topicName) {
+ private CompletableFuture<Void> internalUpdateOffloadPolicies(OffloadPolicies offloadPolicies,
+ TopicName topicName) {
return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
.thenAccept(optionalTopic -> {
- try {
- if (!optionalTopic.isPresent() || !topicName.isPersistent()) {
- return;
- }
- PersistentTopic persistentTopic = (PersistentTopic) optionalTopic.get();
- ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
- if (offloadPolicies == null) {
- LedgerOffloader namespaceOffloader = pulsar().getLedgerOffloaderMap().get(topicName.getNamespaceObject());
- LedgerOffloader topicOffloader = managedLedgerConfig.getLedgerOffloader();
- if (topicOffloader != null && topicOffloader != namespaceOffloader) {
- topicOffloader.close();
+ try {
+ if (!optionalTopic.isPresent() || !topicName.isPersistent()) {
+ return;
+ }
+ PersistentTopic persistentTopic = (PersistentTopic) optionalTopic.get();
+ ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
+ if (offloadPolicies == null) {
+ LedgerOffloader namespaceOffloader =
+ pulsar().getLedgerOffloaderMap().get(topicName.getNamespaceObject());
+ LedgerOffloader topicOffloader = managedLedgerConfig.getLedgerOffloader();
+ if (topicOffloader != null && topicOffloader != namespaceOffloader) {
+ topicOffloader.close();
+ }
+ managedLedgerConfig.setLedgerOffloader(namespaceOffloader);
+ } else {
+ managedLedgerConfig.setLedgerOffloader(
+ pulsar().createManagedLedgerOffloader(offloadPolicies));
+ }
+ persistentTopic.getManagedLedger().setConfig(managedLedgerConfig);
+ } catch (PulsarServerException e) {
+ throw new RestException(e);
}
- managedLedgerConfig.setLedgerOffloader(namespaceOffloader);
- } else {
- managedLedgerConfig.setLedgerOffloader(pulsar().createManagedLedgerOffloader(offloadPolicies));
- }
- persistentTopic.getManagedLedger().setConfig(managedLedgerConfig);
- } catch (PulsarServerException e) {
- throw new RestException(e);
- }
- });
+ });
}
protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnSubscription(Integer maxUnackedNum) {
@@ -923,7 +941,8 @@ public class PersistentTopicsBase extends AdminResource {
topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.error("Topic {} policies cache have not init.", topicName);
- return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init"));
+ return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
+ "Policies cache have not init"));
}
if (topicPolicies == null) {
topicPolicies = new TopicPolicies();
@@ -1006,10 +1025,12 @@ public class PersistentTopicsBase extends AdminResource {
if (topicName.isPartitioned()) {
internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, authoritative);
} else {
- getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+ getPartitionedTopicMetadataAsync(topicName, authoritative,
+ false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
try {
- // get the subscriptions only from the 1st partition since all the other partitions will have the same
+ // get the subscriptions only from the 1st partition
+ // since all the other partitions will have the same
// subscriptions
pulsar().getAdminClient().topics().getSubscriptionsAsync(topicName.getPartition(0).toString())
.whenComplete((r, ex) -> {
@@ -1060,7 +1081,8 @@ public class PersistentTopicsBase extends AdminResource {
asyncResponse.resume(subscriptions);
} catch (WebApplicationException wae) {
if (log.isDebugEnabled()) {
- log.debug("[{}] Failed to get subscriptions for non-partitioned topic {}, redirecting to other brokers.",
+ log.debug("[{}] Failed to get subscriptions for non-partitioned topic {},"
+ + " redirecting to other brokers.",
clientAppId(), topicName, wae);
}
resumeAsyncResponseExceptionally(asyncResponse, wae);
@@ -1092,7 +1114,8 @@ public class PersistentTopicsBase extends AdminResource {
boolean includeMetadata = metadata && hasSuperUserAccess();
return topic.getInternalStats(includeMetadata).get();
} catch (Exception e) {
- throw new RestException(Status.INTERNAL_SERVER_ERROR, (e instanceof ExecutionException) ? e.getCause().getMessage() : e.getMessage());
+ throw new RestException(Status.INTERNAL_SERVER_ERROR,
+ (e instanceof ExecutionException) ? e.getCause().getMessage() : e.getMessage());
}
}
@@ -1111,7 +1134,8 @@ public class PersistentTopicsBase extends AdminResource {
if (topicName.isPartitioned()) {
internalGetManagedLedgerInfoForNonPartitionedTopic(asyncResponse);
} else {
- getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+ getPartitionedTopicMetadataAsync(topicName,
+ authoritative, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<JsonObject>> futures = Lists.newArrayList();
@@ -1121,20 +1145,24 @@ public class PersistentTopicsBase extends AdminResource {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics()
- .getInternalInfoAsync(topicNamePartition.toString()).whenComplete((jsonObject, throwable) -> {
- if(throwable != null) {
- log.error("[{}] Failed to get managed info for {}", clientAppId(), topicNamePartition, throwable);
- asyncResponse.resume(new RestException(throwable));
- }
- Gson gson = new GsonBuilder().setPrettyPrinting().create();
- try {
- partitionedManagedLedgerInfo.partitions.put(topicNamePartition.toString(),
- jsonMapper().readValue(gson.toJson(jsonObject), ManagedLedgerInfo.class));
- } catch (JsonProcessingException ex) {
- log.error("[{}] Failed to parse ManagedLedgerInfo for {} from [{}]", clientAppId(),
- topicNamePartition, gson.toJson(jsonObject), ex);
- }
- }));
+ .getInternalInfoAsync(
+ topicNamePartition.toString()).whenComplete((jsonObject, throwable) -> {
+ if (throwable != null) {
+ log.error("[{}] Failed to get managed info for {}",
+ clientAppId(), topicNamePartition, throwable);
+ asyncResponse.resume(new RestException(throwable));
+ }
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ try {
+ partitionedManagedLedgerInfo.partitions.put(topicNamePartition.toString(),
+ jsonMapper().readValue(gson.toJson(jsonObject),
+ ManagedLedgerInfo.class));
+ } catch (JsonProcessingException ex) {
+ log.error("[{}] Failed to parse ManagedLedgerInfo for {} from [{}]",
+ clientAppId(),
+ topicNamePartition, gson.toJson(jsonObject), ex);
+ }
+ }));
} catch (Exception e) {
log.error("[{}] Failed to get managed info for {}", clientAppId(), topicNamePartition, e);
throw new RestException(e);
@@ -1148,7 +1176,7 @@ public class PersistentTopicsBase extends AdminResource {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
} else {
log.error("[{}] Failed to get managed info for {}", clientAppId(), topicName, t);
- asyncResponse.resume( new RestException(t));
+ asyncResponse.resume(new RestException(t));
}
}
asyncResponse.resume((StreamingOutput) output -> {
@@ -1203,7 +1231,8 @@ public class PersistentTopicsBase extends AdminResource {
return;
}
}
- getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+ getPartitionedTopicMetadataAsync(topicName,
+ authoritative, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned Topic not found"));
return;
@@ -1213,7 +1242,8 @@ public class PersistentTopicsBase extends AdminResource {
for (int i = 0; i < partitionMetadata.partitions; i++) {
try {
topicStatsFutureList
- .add(pulsar().getAdminClient().topics().getStatsAsync((topicName.getPartition(i).toString()), getPreciseBacklog));
+ .add(pulsar().getAdminClient().topics().getStatsAsync(
+ (topicName.getPartition(i).toString()), getPreciseBacklog));
} catch (PulsarServerException e) {
asyncResponse.resume(new RestException(e));
return;
@@ -1244,7 +1274,8 @@ public class PersistentTopicsBase extends AdminResource {
stats.partitions.put(topicName.toString(), new TopicStats());
} else {
asyncResponse.resume(
- new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet"));
+ new RestException(Status.NOT_FOUND,
+ "Internal topics have not been generated yet"));
return null;
}
} catch (KeeperException | InterruptedException e) {
@@ -1272,7 +1303,8 @@ public class PersistentTopicsBase extends AdminResource {
return;
}
}
- getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+ getPartitionedTopicMetadataAsync(topicName,
+ authoritative, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned Topic not found"));
return;
@@ -1315,7 +1347,8 @@ public class PersistentTopicsBase extends AdminResource {
});
}
- protected void internalDeleteSubscription(AsyncResponse asyncResponse, String subName, boolean authoritative, boolean force) {
+ protected void internalDeleteSubscription(AsyncResponse asyncResponse,
+ String subName, boolean authoritative, boolean force) {
if (force) {
internalDeleteSubscriptionForcefully(asyncResponse, subName, authoritative);
} else {
@@ -1337,7 +1370,8 @@ public class PersistentTopicsBase extends AdminResource {
if (topicName.isPartitioned()) {
internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse, subName, authoritative);
} else {
- getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+ getPartitionedTopicMetadataAsync(topicName,
+ authoritative, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
@@ -1347,7 +1381,8 @@ public class PersistentTopicsBase extends AdminResource {
futures.add(pulsar().getAdminClient().topics()
.deleteSubscriptionAsync(topicNamePartition.toString(), subName, false));
} catch (Exception e) {
- log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicNamePartition, subName,
+ log.error("[{}] Failed to delete subscription {} {}",
+ clientAppId(), topicNamePartition, subName,
e);
asyncResponse.resume(new RestException(e));
return;
@@ -1365,7 +1400,8 @@ public class PersistentTopicsBase extends AdminResource {
"Subscription has active connected consumers"));
return null;
} else {
- log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, t);
+ log.error("[{}] Failed to delete subscription {} {}",
+ clientAppId(), topicName, subName, t);
asyncResponse.resume(new RestException(t));
return null;
}
@@ -1378,14 +1414,16 @@ public class PersistentTopicsBase extends AdminResource {
internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse, subName, authoritative);
}
}).exceptionally(ex -> {
- log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName, topicName, ex);
+ log.error("[{}] Failed to delete subscription {} from topic {}",
+ clientAppId(), subName, topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}
- private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, boolean authoritative) {
+ private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse,
+ String subName, boolean authoritative) {
try {
validateAdminAccessForSubscriber(subName, authoritative);
Topic topic = getTopicReference(topicName);
@@ -1415,12 +1453,14 @@ public class PersistentTopicsBase extends AdminResource {
}
}
- protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse, String subName, boolean authoritative) {
+ protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse,
+ String subName, boolean authoritative) {
if (topicName.isGlobal()) {
try {
validateGlobalNamespaceOwnership(namespaceName);
} catch (Exception e) {
- log.error("[{}] Failed to delete subscription forcefully {} from topic {}", clientAppId(), subName, topicName, e);
+ log.error("[{}] Failed to delete subscription forcefully {} from topic {}",
+ clientAppId(), subName, topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
@@ -1429,7 +1469,8 @@ public class PersistentTopicsBase extends AdminResource {
if (topicName.isPartitioned()) {
internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, subName, authoritative);
} else {
- getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+ getPartitionedTopicMetadataAsync(topicName,
+ authoritative, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
@@ -1437,10 +1478,11 @@ public class PersistentTopicsBase extends AdminResource {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics()
- .deleteSubscriptionAsync(topicNamePartition.toString(), subName, true));
+ .deleteSubscriptionAsync(topicNamePartition.toString(), subName, true));
} catch (Exception e) {
- log.error("[{}] Failed to delete subscription forcefully {} {}", clientAppId(), topicNamePartition, subName,
- e);
+ log.error("[{}] Failed to delete subscription forcefully {} {}",
+ clientAppId(), topicNamePartition, subName,
+ e);
asyncResponse.resume(new RestException(e));
return;
}
@@ -1453,7 +1495,8 @@ public class PersistentTopicsBase extends AdminResource {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
return null;
} else {
- log.error("[{}] Failed to delete subscription forcefully {} {}", clientAppId(), topicName, subName, t);
+ log.error("[{}] Failed to delete subscription forcefully {} {}",
+ clientAppId(), topicName, subName, t);
asyncResponse.resume(new RestException(t));
return null;
}
@@ -1466,14 +1509,16 @@ public class PersistentTopicsBase extends AdminResource {
internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, subName, authoritative);
}
}).exceptionally(ex -> {
- log.error("[{}] Failed to delete subscription forcefully {} from topic {}", clientAppId(), subName, topicName, ex);
+ log.error("[{}] Failed to delete subscription forcefully {} from topic {}",
+ clientAppId(), subName, topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}
- private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse asyncResponse, String subName, boolean authoritative) {
+ private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse asyncResponse,
+ String subName, boolean authoritative) {
try {
validateAdminAccessForSubscriber(subName, authoritative);
Topic topic = getTopicReference(topicName);
@@ -1488,12 +1533,14 @@ public class PersistentTopicsBase extends AdminResource {
} catch (Exception e) {
if (e instanceof WebApplicationException) {
if (log.isDebugEnabled()) {
- log.debug("[{}] Failed to delete subscription forcefully from topic {}, redirecting to other brokers.",
+ log.debug("[{}] Failed to delete subscription forcefully from topic {},"
+ + " redirecting to other brokers.",
clientAppId(), topicName, e);
}
asyncResponse.resume(e);
} else {
- log.error("[{}] Failed to delete subscription forcefully {} {}", clientAppId(), topicName, subName, e);
+ log.error("[{}] Failed to delete subscription forcefully {} {}",
+ clientAppId(), topicName, subName, e);
asyncResponse.resume(new RestException(e));
}
}
@@ -1504,7 +1551,8 @@ public class PersistentTopicsBase extends AdminResource {
try {
validateGlobalNamespaceOwnership(namespaceName);
} catch (Exception e) {
- log.error("[{}] Failed to skip all messages for subscription {} on topic {}", clientAppId(), subName, topicName, e);
+ log.error("[{}] Failed to skip all messages for subscription {} on topic {}",
+ clientAppId(), subName, topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
@@ -1513,17 +1561,22 @@ public class PersistentTopicsBase extends AdminResource {
if (topicName.isPartitioned()) {
internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName, authoritative);
} else {
- getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+ getPartitionedTopicMetadataAsync(topicName,
+ authoritative, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
- futures.add(pulsar().getAdminClient().topics().skipAllMessagesAsync(topicNamePartition.toString(),
- subName));
+ futures.add(pulsar()
+ .getAdminClient()
+ .topics()
+ .skipAllMessagesAsync(topicNamePartition.toString(),
+ subName));
} catch (Exception e) {
- log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicNamePartition, subName, e);
+ log.error("[{}] Failed to skip all messages {} {}",
+ clientAppId(), topicNamePartition, subName, e);
asyncResponse.resume(new RestException(e));
return;
}
@@ -1536,7 +1589,8 @@ public class PersistentTopicsBase extends AdminResource {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
return null;
} else {
- log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, t);
+ log.error("[{}] Failed to skip all messages {} {}",
+ clientAppId(), topicName, subName, t);
asyncResponse.resume(new RestException(t));
return null;
}
@@ -1549,14 +1603,16 @@ public class PersistentTopicsBase extends AdminResource {
internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName, authoritative);
}
}).exceptionally(ex -> {
- log.error("[{}] Failed to skip all messages for subscription {} on topic {}", clientAppId(), subName, topicName, ex);
+ log.error("[{}] Failed to skip all messages for subscription {} on topic {}",
+ clientAppId(), subName, topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
+ return null;
});
}
}
- private void internalSkipAllMessagesForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, boolean authoritative) {
+ private void internalSkipAllMessagesForNonPartitionedTopic(AsyncResponse asyncResponse,
+ String subName, boolean authoritative) {
try {
validateAdminAccessForSubscriber(subName, authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
@@ -1587,12 +1643,14 @@ public class PersistentTopicsBase extends AdminResource {
}
} catch (WebApplicationException wae) {
if (log.isDebugEnabled()) {
- log.debug("[{}] Failed to skip all messages for subscription on topic {}, redirecting to other brokers.",
+ log.debug("[{}] Failed to skip all messages for subscription on topic {},"
+ + " redirecting to other brokers.",
clientAppId(), topicName, wae);
}
resumeAsyncResponseExceptionally(asyncResponse, wae);
} catch (Exception e) {
- log.error("[{}] Failed to skip all messages for subscription {} on topic {}", clientAppId(), subName, topicName, e);
+ log.error("[{}] Failed to skip all messages for subscription {} on topic {}",
+ clientAppId(), subName, topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}
@@ -1601,7 +1659,8 @@ public class PersistentTopicsBase extends AdminResource {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
+ PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName,
+ authoritative, false);
if (partitionMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Skip messages on a partitioned topic is not allowed");
}
@@ -1634,16 +1693,19 @@ public class PersistentTopicsBase extends AdminResource {
try {
validateGlobalNamespaceOwnership(namespaceName);
} catch (Exception e) {
- log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, e);
+ log.error("[{}] Failed to expire messages for all subscription on topic {}",
+ clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
}
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
- internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse, expireTimeInSeconds, authoritative);
+ internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
+ expireTimeInSeconds, authoritative);
} else {
- getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+ getPartitionedTopicMetadataAsync(topicName,
+ authoritative, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
@@ -1651,10 +1713,14 @@ public class PersistentTopicsBase extends AdminResource {
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
- futures.add(pulsar().getAdminClient().topics().expireMessagesForAllSubscriptionsAsync(
- topicNamePartition.toString(), expireTimeInSeconds));
+ futures.add(pulsar()
+ .getAdminClient()
+ .topics()
+ .expireMessagesForAllSubscriptionsAsync(
+ topicNamePartition.toString(), expireTimeInSeconds));
} catch (Exception e) {
- log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds,
+ log.error("[{}] Failed to expire messages up to {} on {}",
+ clientAppId(), expireTimeInSeconds,
topicNamePartition, e);
asyncResponse.resume(new RestException(e));
return;
@@ -1664,7 +1730,8 @@ public class PersistentTopicsBase extends AdminResource {
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
- log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds,
+ log.error("[{}] Failed to expire messages up to {} on {}",
+ clientAppId(), expireTimeInSeconds,
topicName, t);
asyncResponse.resume(new RestException(t));
return null;
@@ -1674,18 +1741,21 @@ public class PersistentTopicsBase extends AdminResource {
return null;
});
} else {
- internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse, expireTimeInSeconds, authoritative);
+ internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
+ expireTimeInSeconds, authoritative);
}
}).exceptionally(ex -> {
- log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, ex);
+ log.error("[{}] Failed to expire messages for all subscription on topic {}",
+ clientAppId(), topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}
- private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, int expireTimeInSeconds,
- boolean authoritative) {
+ private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse,
+ int expireTimeInSeconds,
+ boolean authoritative) {
// validate ownership and redirect if current broker is not owner
PersistentTopic topic;
try {
@@ -1694,13 +1764,15 @@ public class PersistentTopicsBase extends AdminResource {
topic = (PersistentTopic) getTopicReference(topicName);
} catch (WebApplicationException wae) {
if (log.isDebugEnabled()) {
- log.debug("[{}] Failed to expire messages for all subscription on topic {}, redirecting to other brokers.",
+ log.debug("[{}] Failed to expire messages for all subscription on topic {},"
+ + " redirecting to other brokers.",
clientAppId(), topicName, wae);
}
resumeAsyncResponseExceptionally(asyncResponse, wae);
return;
} catch (Exception e) {
- log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, e);
+ log.error("[{}] Failed to expire messages for all subscription on topic {}",
+ clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
@@ -1742,7 +1814,8 @@ public class PersistentTopicsBase extends AdminResource {
try {
validateGlobalNamespaceOwnership(namespaceName);
} catch (Exception e) {
- log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}: {}", clientAppId(), topicName,
+ log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}: {}",
+ clientAppId(), topicName,
subName, timestamp, e.getMessage());
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
@@ -1752,7 +1825,8 @@ public class PersistentTopicsBase extends AdminResource {
if (topicName.isPartitioned()) {
internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
} else {
- getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+ getPartitionedTopicMetadataAsync(topicName,
+ authoritative, false).thenAccept(partitionMetadata -> {
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -1764,7 +1838,8 @@ public class PersistentTopicsBase extends AdminResource {
TopicName topicNamePartition = topicName.getPartition(i);
try {
pulsar().getAdminClient().topics()
- .resetCursorAsync(topicNamePartition.toString(), subName, timestamp).handle((r, ex) -> {
+ .resetCursorAsync(topicNamePartition.toString(),
+ subName, timestamp).handle((r, ex) -> {
if (ex != null) {
if (ex instanceof PreconditionFailedException) {
// throw the last exception if all partitions get this error
@@ -1805,14 +1880,16 @@ public class PersistentTopicsBase extends AdminResource {
// report an error to user if unable to reset for all partitions
if (failureCount.get() == numPartitions) {
- log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName,
+ log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
+ clientAppId(), topicName,
subName, timestamp, partitionException.get());
asyncResponse.resume(
- new RestException(Status.PRECONDITION_FAILED, partitionException.get().getMessage()));
+ new RestException(Status.PRECONDITION_FAILED,
+ partitionException.get().getMessage()));
return;
} else if (failureCount.get() > 0) {
- log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}", clientAppId(),
- topicName, subName, timestamp, partitionException.get());
+ log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}",
+ clientAppId(), topicName, subName, timestamp, partitionException.get());
}
asyncResponse.resume(Response.noContent().build());
@@ -1821,7 +1898,8 @@ public class PersistentTopicsBase extends AdminResource {
internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
}
}).exceptionally(ex -> {
- log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, ex);
+ log.error("[{}] Failed to expire messages for all subscription on topic {}",
+ clientAppId(), topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
@@ -1832,8 +1910,8 @@ public class PersistentTopicsBase extends AdminResource {
boolean authoritative) {
try {
validateAdminAccessForSubscriber(subName, authoritative);
- log.info("[{}] [{}] Received reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
- timestamp);
+ log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
+ clientAppId(), topicName, subName, timestamp);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
if (topic == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
@@ -1864,8 +1942,8 @@ public class PersistentTopicsBase extends AdminResource {
return null;
});
} catch (Exception e) {
- log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
- timestamp, e);
+ log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}",
+ clientAppId(), topicName, subName, timestamp, e);
if (e instanceof NotAllowedException) {
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, e.getMessage()));
} else {
@@ -1880,7 +1958,8 @@ public class PersistentTopicsBase extends AdminResource {
try {
validateGlobalNamespaceOwnership(namespaceName);
} catch (Exception e) {
- log.error("[{}] Failed to create subscription {} on topic {}", clientAppId(), subscriptionName, topicName, e);
+ log.error("[{}] Failed to create subscription {} on topic {}",
+ clientAppId(), subscriptionName, topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
@@ -1890,10 +1969,12 @@ public class PersistentTopicsBase extends AdminResource {
targetMessageId);
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
- internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, subscriptionName, targetMessageId, authoritative, replicated);
+ internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
+ subscriptionName, targetMessageId, authoritative, replicated);
} else {
boolean allowAutoTopicCreation = pulsar().getConfiguration().isAllowAutoTopicCreation();
- getPartitionedTopicMetadataAsync(topicName, authoritative, allowAutoTopicCreation).thenAccept(partitionMetadata -> {
+ getPartitionedTopicMetadataAsync(topicName,
+ authoritative, allowAutoTopicCreation).thenAccept(partitionMetadata -> {
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -1906,10 +1987,12 @@ public class PersistentTopicsBase extends AdminResource {
TopicName topicNamePartition = topicName.getPartition(i);
try {
pulsar().getAdminClient().topics()
- .createSubscriptionAsync(topicNamePartition.toString(), subscriptionName, targetMessageId)
+ .createSubscriptionAsync(topicNamePartition.toString(),
+ subscriptionName, targetMessageId)
.handle((r, ex) -> {
if (ex != null) {
- // fail the operation on unknown exception or if all the partitioned failed due to
+ // fail the operation on unknown exception or
+ // if all the partitioned failed due to
// subscription-already-exist
if (failureCount.incrementAndGet() == numPartitions
|| !(ex instanceof PulsarAdminException.ConflictException)) {
@@ -1942,10 +2025,12 @@ public class PersistentTopicsBase extends AdminResource {
}
if (partitionException.get() != null) {
- log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicName,
+ log.warn("[{}] [{}] Failed to create subscription {} at message id {}",
+ clientAppId(), topicName,
subscriptionName, targetMessageId, partitionException.get());
if (partitionException.get() instanceof PulsarAdminException) {
- asyncResponse.resume(new RestException((PulsarAdminException) partitionException.get()));
+ asyncResponse.resume(
+ new RestException((PulsarAdminException) partitionException.get()));
return;
} else {
asyncResponse.resume(new RestException(partitionException.get()));
@@ -1956,18 +2041,21 @@ public class PersistentTopicsBase extends AdminResource {
asyncResponse.resume(Response.noContent().build());
});
} else {
- internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, subscriptionName, targetMessageId, authoritative, replicated);
+ internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
+ subscriptionName, targetMessageId, authoritative, replicated);
}
}).exceptionally(ex -> {
- log.error("[{}] Failed to create subscription {} on topic {}", clientAppId(), subscriptionName, topicName, ex);
+ log.error("[{}] Failed to create subscription {} on topic {}",
+ clientAppId(), subscriptionName, topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}
- private void internalCreateSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse, String subscriptionName,
- MessageIdImpl targetMessageId, boolean authoritative, boolean replicated) {
+ private void internalCreateSubscriptionForNonPartitionedTopic(
+ AsyncResponse asyncResponse, String subscriptionName,
+ MessageIdImpl targetMessageId, boolean authoritative, boolean replicated) {
try {
validateAdminAccessForSubscriber(subscriptionName, authoritative);
@@ -2051,10 +2139,12 @@ public class PersistentTopicsBase extends AdminResource {
if (batchIndex >= 0) {
try {
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
- ledger.asyncReadEntry(new PositionImpl(messageId.getLedgerId(), messageId.getEntryId()), new AsyncCallbacks.ReadEntryCallback() {
+ ledger.asyncReadEntry(new PositionImpl(messageId.getLedgerId(),
+ messageId.getEntryId()), new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
- // Since we can't read the message from the storage layer, it might be an already delete message ID or an invalid message ID
+ // Since we can't read the message from the storage layer,
+ // it might be an already delete message ID or an invalid message ID
// We should fall back to non batch index seek.
batchSizeFuture.complete(0);
}
@@ -2066,7 +2156,8 @@ public class PersistentTopicsBase extends AdminResource {
if (entry == null) {
batchSizeFuture.complete(0);
} else {
- MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
+ MessageMetadata metadata =
+ Commands.parseMessageMetadata(entry.getDataBuffer());
batchSizeFuture.complete(metadata.getNumMessagesInBatch());
}
} catch (Exception e) {
@@ -2099,7 +2190,8 @@ public class PersistentTopicsBase extends AdminResource {
bitSet.clear(0, Math.max(batchIndex + 1, 0));
if (bitSet.length() > 0) {
ackSet = bitSet.toLongArray();
- seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId(), ackSet);
+ seekPosition = PositionImpl.get(messageId.getLedgerId(),
+ messageId.getEntryId(), ackSet);
} else {
seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
seekPosition = seekPosition.getNext();
@@ -2108,7 +2200,8 @@ public class PersistentTopicsBase extends AdminResource {
if (batchIndex - 1 >= 0) {
bitSet.clear(0, batchIndex);
ackSet = bitSet.toLongArray();
- seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId(), ackSet);
+ seekPosition = PositionImpl.get(messageId.getLedgerId(),
+ messageId.getEntryId(), ackSet);
} else {
seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
}
@@ -2188,7 +2281,8 @@ public class PersistentTopicsBase extends AdminResource {
protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) {
verifyReadOperation(authoritative);
// If the topic name is a partition name, no need to get partition topic metadata again
- if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
+ if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
+ authoritative, false).partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed");
}
validateAdminAccessForSubscriber(subName, authoritative);
@@ -2233,14 +2327,17 @@ public class PersistentTopicsBase extends AdminResource {
validateGlobalNamespaceOwnership(namespaceName);
}
- if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
- throw new RestException(Status.METHOD_NOT_ALLOWED, "Examine messages on a partitioned topic is not allowed, " +
- "please try examine message on specific topic partition");
+ if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
+ authoritative, false).partitions > 0) {
+ throw new RestException(Status.METHOD_NOT_ALLOWED,
+ "Examine messages on a partitioned topic is not allowed, "
+ + "please try examine message on specific topic partition");
}
validateTopicOwnership(topicName, authoritative);
if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {} ", clientAppId(), topicName);
- throw new RestException(Status.METHOD_NOT_ALLOWED, "Examine messages on a non-persistent topic is not allowed");
+ throw new RestException(Status.METHOD_NOT_ALLOWED,
+ "Examine messages on a non-persistent topic is not allowed");
}
if (messagePosition < 1) {
@@ -2255,7 +2352,8 @@ public class PersistentTopicsBase extends AdminResource {
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
long totalMessage = topic.getNumberOfEntries();
PositionImpl startPosition = topic.getFirstPosition();
- long messageToSkip = initialPosition.equals("earliest")? messagePosition : totalMessage - messagePosition + 1;
+ long messageToSkip =
+ initialPosition.equals("earliest") ? messagePosition : totalMessage - messagePosition + 1;
CompletableFuture<Entry> future = new CompletableFuture<>();
PositionImpl readPosition = topic.getPositionAfterN(startPosition, messageToSkip);
topic.asyncReadEntry(readPosition, new AsyncCallbacks.ReadEntryCallback() {
@@ -2376,7 +2474,8 @@ public class PersistentTopicsBase extends AdminResource {
return offlineTopicStats;
}
- protected void internalSetBacklogQuota(AsyncResponse asyncResponse, BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
+ protected void internalSetBacklogQuota(AsyncResponse asyncResponse,
+ BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
@@ -2399,25 +2498,25 @@ public class PersistentTopicsBase extends AdminResource {
}
RetentionPolicies retentionPolicies = getRetentionPolicies(topicName, topicPolicies);
- if(!checkBacklogQuota(backlogQuota,retentionPolicies)){
+ if (!checkBacklogQuota(backlogQuota, retentionPolicies)) {
log.warn(
"[{}] Failed to update backlog configuration for topic {}: conflicts with retention quota",
clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
- "Backlog Quota exceeds configured retention quota for topic. " +
- "Please increase retention quota and retry"));
+ "Backlog Quota exceeds configured retention quota for topic. "
+ + "Please increase retention quota and retry"));
}
- if(backlogQuota != null){
+ if (backlogQuota != null) {
topicPolicies.getBackLogQuotaMap().put(backlogQuotaType.name(), backlogQuota);
- }else {
+ } else {
topicPolicies.getBackLogQuotaMap().remove(backlogQuotaType.name());
}
Map<String, BacklogQuota> backLogQuotaMap = topicPolicies.getBackLogQuotaMap();
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
.whenComplete((r, ex) -> {
if (ex != null) {
- log.error("Failed updated backlog quota map",ex);
+ log.error("Failed updated backlog quota map", ex);
asyncResponse.resume(new RestException(ex));
} else {
try {
@@ -2474,7 +2573,7 @@ public class PersistentTopicsBase extends AdminResource {
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
.whenComplete((result, ex) -> {
if (ex != null) {
- log.error("Failed set message ttl for topic",ex);
+ log.error("Failed set message ttl for topic", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully set topic message ttl: namespace={}, topic={}, ttl={}",
@@ -2518,7 +2617,7 @@ public class PersistentTopicsBase extends AdminResource {
.map(TopicPolicies::getRetentionPolicies);
if (!retention.isPresent()) {
asyncResponse.resume(Response.noContent().build());
- }else {
+ } else {
asyncResponse.resume(retention.get());
}
}
@@ -2537,17 +2636,17 @@ public class PersistentTopicsBase extends AdminResource {
.orElseGet(TopicPolicies::new);
BacklogQuota backlogQuota =
topicPolicies.getBackLogQuotaMap().get(BacklogQuota.BacklogQuotaType.destination_storage.name());
- if (backlogQuota == null){
+ if (backlogQuota == null) {
Policies policies = getNamespacePolicies(topicName.getNamespaceObject());
backlogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
}
- if(!checkBacklogQuota(backlogQuota, retention)){
+ if (!checkBacklogQuota(backlogQuota, retention)) {
log.warn(
"[{}] Failed to update retention quota configuration for topic {}: conflicts with retention quota",
clientAppId(), topicName);
throw new RestException(Status.PRECONDITION_FAILED,
- "Retention Quota must exceed configured backlog quota for topic. " +
- "Please increase retention quota and retry");
+ "Retention Quota must exceed configured backlog quota for topic. "
+ + "Please increase retention quota and retry");
}
topicPolicies.setRetentionPolicies(retention);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
@@ -2610,8 +2709,8 @@ public class PersistentTopicsBase extends AdminResource {
protected CompletableFuture<Void> internalSetMaxMessageSize(Integer maxMessageSize) {
if (maxMessageSize != null && (maxMessageSize < 0 || maxMessageSize > config().getMaxMessageSize())) {
throw new RestException(Status.PRECONDITION_FAILED
- , "topic-level maxMessageSize must be greater than or equal to 0 " +
- "and must be smaller than that in the broker-level");
+ , "topic-level maxMessageSize must be greater than or equal to 0 "
+ + "and must be smaller than that in the broker-level");
}
validateAdminAccessForTenant(namespaceName.getTenant());
@@ -2757,12 +2856,13 @@ public class PersistentTopicsBase extends AdminResource {
try {
futures.add(pulsar().getAdminClient().topics()
.terminateTopicAsync(topicNamePartition.toString()).whenComplete((messageId, throwable) -> {
- if(throwable != null) {
- log.error("[{}] Failed to terminate topic {}", clientAppId(), topicNamePartition, throwable);
- asyncResponse.resume(new RestException(throwable));
- }
- messageIds.add(messageId);
- }));
+ if (throwable != null) {
+ log.error("[{}] Failed to terminate topic {}", clientAppId(), topicNamePartition,
+ throwable);
+ asyncResponse.resume(new RestException(throwable));
+ }
+ messageIds.add(messageId);
+ }));
} catch (Exception e) {
log.error("[{}] Failed to terminate topic {}", clientAppId(), topicNamePartition, e);
throw new RestException(e);
@@ -2774,8 +2874,8 @@ public class PersistentTopicsBase extends AdminResource {
if (t instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
} else {
- log.error("[{}] Failed to terminate topic {}", clientAppId(), topicName, t);
- asyncResponse.resume( new RestException(t));
+ log.error("[{}] Failed to terminate topic {}", clientAppId(), topicName, t);
+ asyncResponse.resume(new RestException(t));
}
}
asyncResponse.resume(messageIds);
@@ -2811,8 +2911,11 @@ public class PersistentTopicsBase extends AdminResource {
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
- futures.add(pulsar().getAdminClient().topics().expireMessagesAsync(topicNamePartition.toString(),
- subName, expireTimeInSeconds));
+ futures.add(pulsar()
+ .getAdminClient()
+ .topics()
+ .expireMessagesAsync(topicNamePartition.toString(),
+ subName, expireTimeInSeconds));
} catch (Exception e) {
log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds,
topicNamePartition, e);
@@ -2828,8 +2931,9 @@ public class PersistentTopicsBase extends AdminResource {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
return null;
} else {
- log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds,
- topicName, t);
+ log.error("[{}] Failed to expire messages up to {} on {}",
+ clientAppId(), expireTimeInSeconds,
+ topicName, t);
asyncResponse.resume(new RestException(t));
return null;
}
@@ -2928,9 +3032,13 @@ public class PersistentTopicsBase extends AdminResource {
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
- futures.add(pulsar().getAdminClient().topics().triggerCompactionAsync(topicNamePartition.toString()));
+ futures.add(pulsar()
+ .getAdminClient()
+ .topics()
+ .triggerCompactionAsync(topicNamePartition.toString()));
} catch (Exception e) {
- log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicNamePartition, e);
+ log.error("[{}] Failed to trigger compaction on topic {}",
+ clientAppId(), topicNamePartition, e);
asyncResponse.resume(new RestException(e));
return;
}
@@ -2946,7 +3054,8 @@ public class PersistentTopicsBase extends AdminResource {
asyncResponse.resume(th);
return null;
} else {
- log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, exception);
+ log.error("[{}] Failed to trigger compaction on topic {}",
+ clientAppId(), topicName, exception);
asyncResponse.resume(new RestException(exception));
return null;
}
@@ -3011,8 +3120,9 @@ public class PersistentTopicsBase extends AdminResource {
return topic.offloadStatus();
}
- public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(PulsarService pulsar,
- String clientAppId, String originalPrincipal, AuthenticationDataSource authenticationData, TopicName topicName) {
+ public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
+ PulsarService pulsar, String clientAppId, String originalPrincipal,
+ AuthenticationDataSource authenticationData, TopicName topicName) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
try {
// (1) authorize client
@@ -3020,7 +3130,8 @@ public class PersistentTopicsBase extends AdminResource {
checkAuthorization(pulsar, topicName, clientAppId, authenticationData);
} catch (RestException e) {
try {
- validateAdminAccessForTenant(pulsar, clientAppId, originalPrincipal, topicName.getTenant(), authenticationData);
+ validateAdminAccessForTenant(pulsar,
+ clientAppId, originalPrincipal, topicName.getTenant(), authenticationData);
} catch (RestException authException) {
log.warn("Failed to authorize {} on cluster {}", clientAppId, topicName.toString());
throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
@@ -3084,7 +3195,7 @@ public class PersistentTopicsBase extends AdminResource {
}
/**
- * Get the Topic object reference from the Pulsar broker
+ * Get the Topic object reference from the Pulsar broker.
*/
private Topic getTopicReference(TopicName topicName) {
try {
@@ -3106,8 +3217,8 @@ public class PersistentTopicsBase extends AdminResource {
PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(
TopicName.get(topicName.getPartitionedTopicName()), false, false);
if (partitionedTopicMetadata == null || partitionedTopicMetadata.partitions == 0) {
- final String topicErrorType = partitionedTopicMetadata == null ?
- "has no metadata" : "has zero partitions";
+ final String topicErrorType = partitionedTopicMetadata
+ == null ? "has no metadata" : "has zero partitions";
return new RestException(Status.NOT_FOUND, String.format(
"Partitioned Topic not found: %s %s", topicName.toString(), topicErrorType));
} else if (!internalGetList().contains(topicName.toString())) {
@@ -3117,17 +3228,19 @@ public class PersistentTopicsBase extends AdminResource {
}
private Topic getOrCreateTopic(TopicName topicName) {
- return pulsar().getBrokerService().getTopic(topicName.toString(), true).thenApply(Optional::get).join();
+ return pulsar().getBrokerService().getTopic(
+ topicName.toString(), true).thenApply(Optional::get).join();
}
/**
- * Get the Subscription object reference from the Topic reference
+ * Get the Subscription object reference from the Topic reference.
*/
private Subscription getSubscriptionReference(String subName, PersistentTopic topic) {
try {
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
- sub = topic.createSubscription(subName, InitialPosition.Earliest, false).get();
+ sub = topic.createSubscription(subName,
+ InitialPosition.Earliest, false).get();
}
return checkNotNull(sub);
@@ -3137,7 +3250,7 @@ public class PersistentTopicsBase extends AdminResource {
}
/**
- * Get the Replicator object reference from the Topic reference
+ * Get the Replicator object reference from the Topic reference.
*/
private PersistentReplicator getReplicatorReference(String replName, PersistentTopic topic) {
try {
@@ -3176,12 +3289,10 @@ public class PersistentTopicsBase extends AdminResource {
}
/**
- * It creates subscriptions for new partitions of existing partitioned-topics
+ * It creates subscriptions for new partitions of existing partitioned-topics.
*
- * @param topicName
- * : topic-name: persistent://prop/cluster/ns/topic
- * @param numPartitions
- * : number partitions for the topics
+ * @param topicName : topic-name: persistent://prop/cluster/ns/topic
+ * @param numPartitions : number partitions for the topics
*/
private CompletableFuture<Void> createSubscriptions(TopicName topicName, int numPartitions) {
CompletableFuture<Void> result = new CompletableFuture<>();
@@ -3228,7 +3339,8 @@ public class PersistentTopicsBase extends AdminResource {
log.info("[{}] Successfully created new partitions {}", clientAppId(), topicName);
result.complete(null);
}).exceptionally(ex -> {
- log.warn("[{}] Failed to create subscriptions on new partitions for {}", clientAppId(), topicName, ex);
+ log.warn("[{}] Failed to create subscriptions on new partitions for {}",
+ clientAppId(), topicName, ex);
result.completeExceptionally(ex);
return null;
});
@@ -3237,13 +3349,15 @@ public class PersistentTopicsBase extends AdminResource {
// The first partition doesn't exist, so there are currently to subscriptions to recreate
result.complete(null);
} else {
- log.warn("[{}] Failed to get list of subscriptions of {}", clientAppId(), topicName.getPartition(0), ex);
+ log.warn("[{}] Failed to get list of subscriptions of {}",
+ clientAppId(), topicName.getPartition(0), ex);
result.completeExceptionally(ex);
}
return null;
});
}).exceptionally(ex -> {
- log.warn("[{}] Failed to get partition metadata for {}", clientAppId(), topicName.toString());
+ log.warn("[{}] Failed to get partition metadata for {}",
+ clientAppId(), topicName.toString());
result.completeExceptionally(ex);
return null;
});
@@ -3262,7 +3376,8 @@ public class PersistentTopicsBase extends AdminResource {
final String userAgent = httpRequest.getHeader("User-Agent");
if (StringUtils.isBlank(userAgent)) {
throw new RestException(Status.METHOD_NOT_ALLOWED,
- "Client lib is not compatible to access partitioned metadata: version in user-agent is not present");
+ "Client lib is not compatible to"
+ + " access partitioned metadata: version in user-agent is not present");
}
// Version < 1.20 for cpp-client is not allowed
if (userAgent.contains(DEPRECATED_CLIENT_VERSION_PREFIX)) {
@@ -3307,15 +3422,21 @@ public class PersistentTopicsBase extends AdminResource {
long suffix = Long.parseLong(exsitingTopicName.substring(
exsitingTopicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX)
+ TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
- // Skip partition of partitioned topic by making sure the numeric suffix greater than old partition number.
+ // Skip partition of partitioned topic by making sure
+ // the numeric suffix greater than old partition number.
if (suffix >= oldPartition && suffix <= (long) numberOfPartition) {
- log.warn("[{}] Already have non partition topic {} which contains partition " +
- "suffix '-partition-' and end with numeric value smaller than the new number of partition. " +
- "Update of partitioned topic {} could cause conflict.", clientAppId(), exsitingTopicName, topicName);
+ log.warn(
+ "[{}] Already have non partition topic {} which contains partition suffix"
+ + " '-partition-' and end with numeric value smaller than the new number"
+ + " of partition. Update of partitioned topic {} could cause conflict.",
+ clientAppId(),
+ exsitingTopicName, topicName);
throw new RestException(Status.PRECONDITION_FAILED,
- "Already have non partition topic" + exsitingTopicName + " which contains partition suffix '-partition-' " +
- "and end with numeric value and end with numeric value smaller than the new " +
- "number of partition. Update of partitioned topic " + topicName + " could cause conflict.");
+ "Already have non partition topic" + exsitingTopicName
+ + " which contains partition suffix '-partition-' "
+ + "and end with numeric value and end with numeric value smaller than the new "
+ + "number of partition. Update of partitioned topic "
+ + topicName + " could cause conflict.");
}
} catch (NumberFormatException e) {
// Do nothing, if value after partition suffix is not pure numeric value,
@@ -3343,26 +3464,27 @@ public class PersistentTopicsBase extends AdminResource {
int partitionIndex = topicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX);
long suffix = Long.parseLong(topicName.substring(partitionIndex
+ TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
- TopicName partitionTopicName = TopicName.get(domain(), namespaceName, topicName.substring(0, partitionIndex));
+ TopicName partitionTopicName = TopicName.get(domain(),
+ namespaceName, topicName.substring(0, partitionIndex));
PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(partitionTopicName, false, false);
// Partition topic index is 0 to (number of partition - 1)
if (metadata.partitions > 0 && suffix >= (long) metadata.partitions) {
- log.warn("[{}] Can't create topic {} with \"-partition-\" followed by" +
- " a number smaller then number of partition of partitioned topic {}.",
+ log.warn("[{}] Can't create topic {} with \"-partition-\" followed by"
+ + " a number smaller then number of partition of partitioned topic {}.",
clientAppId(), topicName, partitionTopicName.getLocalName());
throw new RestException(Status.PRECONDITION_FAILED,
- "Can't create topic " + topicName + " with \"-partition-\" followed by" +
- " a number smaller then number of partition of partitioned topic " +
- partitionTopicName.getLocalName());
+ "Can't create topic " + topicName + " with \"-partition-\" followed by"
+ + " a number smaller then number of partition of partitioned topic "
+ + partitionTopicName.getLocalName());
} else if (metadata.partitions == 0) {
- log.warn("[{}] Can't create topic {} with \"-partition-\" followed by" +
- " numeric value if there isn't a partitioned topic {} created.",
+ log.warn("[{}] Can't create topic {} with \"-partition-\" followed by"
+ + " numeric value if there isn't a partitioned topic {} created.",
clientAppId(), topicName, partitionTopicName.getLocalName());
throw new RestException(Status.PRECONDITION_FAILED,
- "Can't create topic " + topicName + " with \"-partition-\" followed by" +
- " numeric value if there isn't a partitioned topic " +
- partitionTopicName.getLocalName() + " created.");
+ "Can't create topic " + topicName + " with \"-partition-\" followed by"
+ + " numeric value if there isn't a partitioned topic "
+ + partitionTopicName.getLocalName() + " created.");
}
// If there is a partitioned topic with the same name and numeric suffix is smaller than the
// number of partition for that partitioned topic, validation will pass.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
index 81addf6..a38ef6d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
@@ -66,77 +66,96 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request (The Pulsar Sink already exists, etc.)"),
@ApiResponse(code = 200, message = "Pulsar Sink successfully created"),
- @ApiResponse(code = 500, message = "Internal server error (failed to authorize, failed to get tenant data, failed to process package, etc.)"),
+ @ApiResponse(code = 500, message =
+ "Internal server error (failed to authorize,"
+ + " failed to get tenant data, failed to process package, etc.)"),
@ApiResponse(code = 401, message = "Client is not authorized to perform operation"),
@ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
})
@Path("/{tenant}/{namespace}/{sinkName}")
@Consumes(MediaType.MULTIPART_FORM_DATA)
- public void registerSink(@ApiParam(value = "The tenant of a Pulsar Sink")
- final @PathParam("tenant") String tenant,
- @ApiParam(value = "The namespace of a Pulsar Sink")
- final @PathParam("namespace") String namespace,
- @ApiParam(value = "The name of a Pulsar Sink")
- final @PathParam("sinkName") String sinkName,
+ public void registerSink(@ApiParam(value = "The tenant of a Pulsar Sink") final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The namespace of a Pulsar Sink") final @PathParam("namespace")
+ String namespace,
+ @ApiParam(value = "The name of a Pulsar Sink") final @PathParam("sinkName")
+ String sinkName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String sinkPkgUrl,
- @ApiParam(
- value =
- "A JSON value presenting config payload of a Pulsar Sink. All available configuration options are: \n" +
- "- **classname** \n" +
- " The class name of a Pulsar Sink if archive is file-url-path (file://) \n" +
- "- **sourceSubscriptionName** \n" +
- " Pulsar source subscription name if user wants a specific \n" +
- " subscription-name for input-topic consumer \n" +
- "- **inputs** \n" +
- " The input topic or topics of a Pulsar Sink (specified as a JSON array) \n" +
- "- **topicsPattern** \n" +
- " TopicsPattern to consume from list of topics under a namespace that " +
- " match the pattern. [input] and [topicsPattern] are mutually " +
- " exclusive. Add SerDe class name for a pattern in customSerdeInputs " +
- " (supported for java fun only)" +
- "- **topicToSerdeClassName** \n" +
- " The map of input topics to SerDe class names (specified as a JSON object) \n" +
- "- **topicToSchemaType** \n" +
- " The map of input topics to Schema types or class names (specified as a JSON object) \n" +
- "- **inputSpecs** \n" +
- " The map of input topics to its consumer configuration, each configuration has schema of " +
- " {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\", \"isRegexPattern\": true, \"receiverQueueSize\": 5} \n" +
- "- **configs** \n" +
- " The map of configs (specified as a JSON object) \n" +
- "- **secrets** \n" +
- " a map of secretName(aka how the secret is going to be \n" +
- " accessed in the function via context) to an object that \n" +
- " encapsulates how the secret is fetched by the underlying \n" +
- " secrets provider. The type of an value here can be found by the \n" +
- " SecretProviderConfigurator.getSecretObjectType() method. (specified as a JSON object) \n" +
- "- **parallelism** \n" +
- " The parallelism factor of a Pulsar Sink (i.e. the number of a Pulsar Sink instances to run \n" +
- "- **processingGuarantees** \n" +
- " The processing guarantees (aka delivery semantics) applied to the Pulsar Sink. Possible Values: \"ATLEAST_ONCE\", \"ATMOST_ONCE\", \"EFFECTIVELY_ONCE\" \n" +
- "- **retainOrdering** \n" +
- " Boolean denotes whether the Pulsar Sink consumes and processes messages in order \n" +
- "- **resources** \n" +
- " {\"cpu\": 1, \"ram\": 2, \"disk\": 3} The CPU (in cores), RAM (in bytes) and disk (in bytes) that needs to be allocated per Pulsar Sink instance (applicable only to Docker runtime) \n" +
- "- **autoAck** \n" +
- " Boolean denotes whether or not the framework will automatically acknowledge messages \n" +
- "- **timeoutMs** \n" +
- " Long denotes the message timeout in milliseconds \n" +
- "- **cleanupSubscription** \n" +
- " Boolean denotes whether the subscriptions the functions created/used should be deleted when the functions is deleted \n" +
- "- **runtimeFlags** \n" +
- " Any flags that you want to pass to the runtime as a single string \n",
- examples = @Example(
- value = @ExampleProperty(
- mediaType = MediaType.APPLICATION_JSON,
- value = "{ \n" +
- "\t\"classname\": \"org.example.MySinkTest\",\n" +
- "\t\"inputs\": [\"persistent://public/default/sink-input\"],\n" +
- "\t\"processingGuarantees\": \"EFFECTIVELY_ONCE\",\n" +
- "\t\"parallelism\": 10\n" +
- "}"
- )
+ @ApiParam(value =
+ "A JSON value presenting config payload of a Pulsar Sink."
+ + " All available configuration options are:\n"
+ + "- **classname**\n"
+ + " The class name of a Pulsar Sink if"
+ + " archive is file-url-path (file://)\n"
+ + "- **sourceSubscriptionName**\n"
+ + " Pulsar source subscription name if"
+ + " user wants a specific\n"
+ + " subscription-name for input-topic consumer\n"
+ + "- **inputs**\n"
+ + " The input topic or topics of"
+ + " a Pulsar Sink (specified as a JSON array)\n"
+ + "- **topicsPattern**\n"
+ + " TopicsPattern to consume from list of topics under a namespace that "
+ + " match the pattern. [input] and [topicsPattern] are mutually "
+ + " exclusive. Add SerDe class name for a pattern in customSerdeInputs "
+ + " (supported for java fun only)"
+ + "- **topicToSerdeClassName**\n"
+ + " The map of input topics to SerDe class names"
+ + " (specified as a JSON object)\n"
+ + "- **topicToSchemaType**\n"
+ + " The map of input topics to Schema types or class names"
+ + " (specified as a JSON object)\n"
+ + "- **inputSpecs**\n"
+ + " The map of input topics to its consumer configuration,"
+ + " each configuration has schema of "
+ + " {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\","
+ + " \"isRegexPattern\": true, \"receiverQueueSize\": 5}\n"
+ + "- **configs**\n"
+ + " The map of configs (specified as a JSON object)\n"
+ + "- **secrets**\n"
+ + " a map of secretName(aka how the secret is going to be \n"
+ + " accessed in the function via context) to an object that \n"
+ + " encapsulates how the secret is fetched by the underlying \n"
+ + " secrets provider. The type of an value here can be found by the \n"
+ + " SecretProviderConfigurator.getSecretObjectType() method."
+ + " (specified as a JSON object)\n"
+ + "- **parallelism**\n"
+ + " The parallelism factor of a Pulsar Sink"
+ + " (i.e. the number of a Pulsar Sink instances to run \n"
+ + "- **processingGuarantees**\n"
+ + " The processing guarantees (aka delivery semantics) applied to"
+ + " the Pulsar Sink. Possible Values: \"ATLEAST_ONCE\","
+ + " \"ATMOST_ONCE\", \"EFFECTIVELY_ONCE\"\n"
+ + "- **retainOrdering**\n"
+ + " Boolean denotes whether the Pulsar Sink"
+ + " consumes and processes messages in order\n"
+ + "- **resources**\n"
+ + " {\"cpu\": 1, \"ram\": 2, \"disk\": 3} The CPU (in cores),"
+ + " RAM (in bytes) and disk (in bytes) that needs to be "
+ + "allocated per Pulsar Sink instance "
+ + "(applicable only to Docker runtime)\n"
+ + "- **autoAck**\n"
+ + " Boolean denotes whether or not the framework"
+ + " will automatically acknowledge messages\n"
+ + "- **timeoutMs**\n"
+ + " Long denotes the message timeout in milliseconds\n"
+ + "- **cleanupSubscription**\n"
+ + " Boolean denotes whether the subscriptions the functions"
+ + " created/used should be deleted when the functions is deleted\n"
+ + "- **runtimeFlags**\n"
+ + " Any flags that you want to pass to the runtime as a single string\n",
+ examples = @Example(
+ value = @ExampleProperty(
+ mediaType = MediaType.APPLICATION_JSON,
+ value = "{\n"
+ + "\t\"classname\": \"org.example.MySinkTest\",\n"
+ + "\t\"inputs\": ["
+ + "\"persistent://public/default/sink-input\"],\n"
+ + "\t\"processingGuarantees\": \"EFFECTIVELY_ONCE\",\n"
+ + "\t\"parallelism\": 10\n"
+ + "}"
+ )
)
)
final @FormDataParam("sinkConfig") SinkConfig sinkConfig) {
@@ -147,80 +166,96 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
@PUT
@ApiOperation(value = "Updates a Pulsar Sink currently running in cluster mode")
@ApiResponses(value = {
- @ApiResponse(code = 400, message = "Invalid request (The Pulsar Sink doesn't exist, update contains no change, etc.)"),
+ @ApiResponse(code = 400, message =
+ "Invalid request (The Pulsar Sink doesn't exist, update contains no change, etc.)"),
@ApiResponse(code = 200, message = "Pulsar Sink successfully updated"),
@ApiResponse(code = 401, message = "Client is not authorized to perform operation"),
@ApiResponse(code = 404, message = "The Pulsar Sink doesn't exist"),
- @ApiResponse(code = 500, message = "Internal server error (failed to authorize, failed to process package, etc.)"),
+ @ApiResponse(code = 500, message =
+ "Internal server error (failed to authorize, failed to process package, etc.)"),
@ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
})
@Path("/{tenant}/{namespace}/{sinkName}")
@Consumes(MediaType.MULTIPART_FORM_DATA)
- public void updateSink(@ApiParam(value = "The tenant of a Pulsar Sink")
- final @PathParam("tenant") String tenant,
- @ApiParam(value = "The namespace of a Pulsar Sink")
- final @PathParam("namespace") String namespace,
- @ApiParam(value = "The name of a Pulsar Sink")
- final @PathParam("sinkName") String sinkName,
+ public void updateSink(@ApiParam(value = "The tenant of a Pulsar Sink") final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The namespace of a Pulsar Sink") final @PathParam("namespace")
+ String namespace,
+ @ApiParam(value = "The name of a Pulsar Sink") final @PathParam("sinkName") String sinkName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String sinkPkgUrl,
- @ApiParam(
- value =
- "A JSON value presenting config payload of a Pulsar Sink. All available configuration options are: \n" +
- "- **classname** \n" +
- " The class name of a Pulsar Sink if archive is file-url-path (file://) \n" +
- "- **sourceSubscriptionName** \n" +
- " Pulsar source subscription name if user wants a specific \n" +
- " subscription-name for input-topic consumer \n" +
- "- **inputs** \n" +
- " The input topic or topics of a Pulsar Sink (specified as a JSON array) \n" +
- "- **topicsPattern** \n" +
- " TopicsPattern to consume from list of topics under a namespace that " +
- " match the pattern. [input] and [topicsPattern] are mutually " +
- " exclusive. Add SerDe class name for a pattern in customSerdeInputs " +
- " (supported for java fun only)" +
- "- **topicToSerdeClassName** \n" +
- " The map of input topics to SerDe class names (specified as a JSON object) \n" +
- "- **topicToSchemaType** \n" +
- " The map of input topics to Schema types or class names (specified as a JSON object) \n" +
- "- **inputSpecs** \n" +
- " The map of input topics to its consumer configuration, each configuration has schema of " +
- " {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\", \"isRegexPattern\": true, \"receiverQueueSize\": 5} \n" +
- "- **configs** \n" +
- " The map of configs (specified as a JSON object) \n" +
- "- **secrets** \n" +
- " a map of secretName(aka how the secret is going to be \n" +
- " accessed in the function via context) to an object that \n" +
- " encapsulates how the secret is fetched by the underlying \n" +
- " secrets provider. The type of an value here can be found by the \n" +
- " SecretProviderConfigurator.getSecretObjectType() method. (specified as a JSON object) \n" +
- "- **parallelism** \n" +
- " The parallelism factor of a Pulsar Sink (i.e. the number of a Pulsar Sink instances to run \n" +
- "- **processingGuarantees** \n" +
- " The processing guarantees (aka delivery semantics) applied to the Pulsar Sink. Possible Values: \"ATLEAST_ONCE\", \"ATMOST_ONCE\", \"EFFECTIVELY_ONCE\" \n" +
- "- **retainOrdering** \n" +
- " Boolean denotes whether the Pulsar Sink consumes and processes messages in order \n" +
- "- **resources** \n" +
- " {\"cpu\": 1, \"ram\": 2, \"disk\": 3} The CPU (in cores), RAM (in bytes) and disk (in bytes) that needs to be allocated per Pulsar Sink instance (applicable only to Docker runtime) \n" +
- "- **autoAck** \n" +
- " Boolean denotes whether or not the framework will automatically acknowledge messages \n" +
- "- **timeoutMs** \n" +
- " Long denotes the message timeout in milliseconds \n" +
- "- **cleanupSubscription** \n" +
- " Boolean denotes whether the subscriptions the functions created/used should be deleted when the functions is deleted \n" +
- "- **runtimeFlags** \n" +
- " Any flags that you want to pass to the runtime as a single string \n",
- examples = @Example(
- value = @ExampleProperty(
- mediaType = MediaType.APPLICATION_JSON,
- value = "{ \n" +
- "\t\"classname\": \"org.example.SinkStressTest\", \n" +
- "\t\"inputs\": [\"persistent://public/default/sink-input\"],\n" +
- "\t\"processingGuarantees\": \"EFFECTIVELY_ONCE\",\n" +
- "\t\"parallelism\": 5\n" +
- "}"
- )
+ @ApiParam(value =
+ "A JSON value presenting config payload of a Pulsar Sink."
+ + " All available configuration options are:\n"
+ + "- **classname**\n"
+ + " The class name of a Pulsar Sink if"
+ + " archive is file-url-path (file://)\n"
+ + "- **sourceSubscriptionName**\n"
+ + " Pulsar source subscription name if user wants a specific\n"
+ + " subscription-name for input-topic consumer\n"
+ + "- **inputs**\n"
+ + " The input topic or topics of"
+ + " a Pulsar Sink (specified as a JSON array)\n"
+ + "- **topicsPattern**\n"
+ + " TopicsPattern to consume from list of topics under a namespace that "
+ + " match the pattern. [input] and [topicsPattern] are mutually "
+ + " exclusive. Add SerDe class name for a pattern in customSerdeInputs "
+ + " (supported for java fun only)"
+ + "- **topicToSerdeClassName**\n"
+ + " The map of input topics to"
+ + " SerDe class names (specified as a JSON object)\n"
+ + "- **topicToSchemaType**\n"
+ + " The map of input topics to Schema types or"
+ + " class names (specified as a JSON object)\n"
+ + "- **inputSpecs**\n"
+ + " The map of input topics to its consumer configuration,"
+ + " each configuration has schema of "
+ + " {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\","
+ + " \"isRegexPattern\": true, \"receiverQueueSize\": 5}\n"
+ + "- **configs**\n"
+ + " The map of configs (specified as a JSON object)\n"
+ + "- **secrets**\n"
+ + " a map of secretName(aka how the secret is going to be \n"
+ + " accessed in the function via context) to an object that \n"
+ + " encapsulates how the secret is fetched by the underlying \n"
+ + " secrets provider. The type of an value here can be found by the \n"
+ + " SecretProviderConfigurator.getSecretObjectType() method."
+ + " (specified as a JSON object)\n"
+ + "- **parallelism**\n"
+ + " The parallelism factor of a Pulsar Sink "
+ + "(i.e. the number of a Pulsar Sink instances to run \n"
+ + "- **processingGuarantees**\n"
+ + " The processing guarantees (aka delivery semantics) applied to the"
+ + " Pulsar Sink. Possible Values: \"ATLEAST_ONCE\", \"ATMOST_ONCE\","
+ + " \"EFFECTIVELY_ONCE\"\n"
+ + "- **retainOrdering**\n"
+ + " Boolean denotes whether the Pulsar Sink"
+ + " consumes and processes messages in order\n"
+ + "- **resources**\n"
+ + " {\"cpu\": 1, \"ram\": 2, \"disk\": 3} The CPU (in cores),"
+ + " RAM (in bytes) and disk (in bytes) that needs to be allocated per"
+ + " Pulsar Sink instance (applicable only to Docker runtime)\n"
+ + "- **autoAck**\n"
+ + " Boolean denotes whether or not the framework will"
+ + " automatically acknowledge messages\n"
+ + "- **timeoutMs**\n"
+ + " Long denotes the message timeout in milliseconds\n"
+ + "- **cleanupSubscription**\n"
+ + " Boolean denotes whether the subscriptions the functions"
+ + " created/used should be deleted when the functions is deleted\n"
+ + "- **runtimeFlags**\n"
+ + " Any flags that you want to pass to the runtime as a single string\n",
+ examples = @Example(
+ value = @ExampleProperty(
+ mediaType = MediaType.APPLICATION_JSON,
+ value = "{\n"
+ + "\t\"classname\": \"org.example.SinkStressTest\",\n"
+ + "\t\"inputs\": ["
+ + "\"persistent://public/default/sink-input\"],\n"
+ + "\t\"processingGuarantees\": \"EFFECTIVELY_ONCE\",\n"
+ + "\t\"parallelism\": 5\n"
+ + "}"
+ )
)
)
final @FormDataParam("sinkConfig") SinkConfig sinkConfig,
@@ -239,7 +274,8 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
@ApiResponse(code = 404, message = "The Pulsar Sink does not exist"),
@ApiResponse(code = 200, message = "The Pulsar Sink was successfully deleted"),
@ApiResponse(code = 401, message = "Client is not authorized to perform operation"),
- @ApiResponse(code = 500, message = "Internal server error (failed to authorize, failed to deregister, etc.)"),
+ @ApiResponse(code = 500, message =
+ "Internal server error (failed to authorize, failed to deregister, etc.)"),
@ApiResponse(code = 408, message = "Got InterruptedException while deregistering the Pulsar Sink"),
@ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
})
@@ -350,7 +386,9 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
@ApiResponse(code = 400, message = "Invalid restart request"),
@ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
@ApiResponse(code = 404, message = "The Pulsar Sink does not exist"),
- @ApiResponse(code = 500, message = "Internal server error (failed to restart the instance of a Pulsar Sink, failed to authorize, etc.)"),
+ @ApiResponse(code = 500, message =
+ "Internal server error (failed to restart the instance of"
+ + " a Pulsar Sink, failed to authorize, etc.)"),
@ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
})
@Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/restart")
@@ -363,7 +401,8 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
final @PathParam("sinkName") String sinkName,
@ApiParam(value = "The instanceId of a Pulsar Sink")
final @PathParam("instanceId") String instanceId) {
- sink.restartFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
+ sink.restartFunctionInstance(tenant, namespace, sinkName, instanceId,
+ uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -372,7 +411,8 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
@ApiResponse(code = 400, message = "Invalid restart request"),
@ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
@ApiResponse(code = 404, message = "The Pulsar Sink does not exist"),
- @ApiResponse(code = 500, message = "Internal server error (failed to restart the Pulsar Sink, failed to authorize, etc.)"),
+ @ApiResponse(code = 500, message =
+ "Internal server error (failed to restart the Pulsar Sink, failed to authorize, etc.)"),
@ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
})
@Path("/{tenant}/{namespace}/{sinkName}/restart")
@@ -391,7 +431,8 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid stop request"),
@ApiResponse(code = 404, message = "The Pulsar Sink instance does not exist"),
- @ApiResponse(code = 500, message = "Internal server error (failed to stop the Pulsar Sink, failed to authorize, etc.)"),
+ @ApiResponse(code = 500, message =
+ "Internal server error (failed to stop the Pulsar Sink, failed to authorize, etc.)"),
@ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
@ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
})
@@ -405,7 +446,8 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
final @PathParam("sinkName") String sinkName,
@ApiParam(value = "The instanceId of a Pulsar Sink")
final @PathParam("instanceId") String instanceId) {
- sink.stopFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
+ sink.stopFunctionInstance(tenant, namespace,
+ sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -413,7 +455,8 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid stop request"),
@ApiResponse(code = 404, message = "The Pulsar Sink does not exist"),
- @ApiResponse(code = 500, message = "Internal server error (failed to stop the Pulsar Sink, failed to authorize, etc.)"),
+ @ApiResponse(code = 500, message =
+ "Internal server error (failed to stop the Pulsar Sink, failed to authorize, etc.)"),
@ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
@ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
})
@@ -433,7 +476,8 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid start request"),
@ApiResponse(code = 404, message = "The Pulsar Sink does not exist"),
- @ApiResponse(code = 500, message = "Internal server error (failed to start the Pulsar Sink, failed to authorize, etc.)"),
+ @ApiResponse(code = 500, message =
+ "Internal server error (failed to start the Pulsar Sink, failed to authorize, etc.)"),
@ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
@ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
})
@@ -447,7 +491,8 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
final @PathParam("sinkName") String sinkName,
@ApiParam(value = "The instanceId of a Pulsar Sink")
final @PathParam("instanceId") String instanceId) {
- sink.startFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
+ sink.startFunctionInstance(tenant, namespace, sinkName, instanceId,
+ uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -455,7 +500,8 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid start request"),
@ApiResponse(code = 404, message = "The Pulsar Sink does not exist"),
- @ApiResponse(code = 500, message = "Internal server error (failed to start the Pulsar Sink, failed to authorize, etc.)"),
+ @ApiResponse(code = 500, message =
+ "Internal server error (failed to start the Pulsar Sink, failed to authorize, etc.)"),
@ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
@ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
})
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
index 771bdd2..cdd1595 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
@@ -65,7 +65,8 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
@ApiOperation(value = "Creates a new Pulsar Source in cluster mode")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Pulsar Function successfully created"),
- @ApiResponse(code = 400, message = "Invalid request (Function already exists or Tenant, Namespace or Name is not provided, etc.)"),
+ @ApiResponse(code = 400, message =
+ "Invalid request (Function already exists or Tenant, Namespace or Name is not provided, etc.)"),
@ApiResponse(code = 401, message = "Client is not authorize to perform operation"),
@ApiResponse(code = 500, message = "Internal Server Error"),
@ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
@@ -84,35 +85,41 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String sourcePkgUrl,
@ApiParam(
- value = "A JSON value presenting configuration payload of a Pulsar Source. An example of the expected functions can be found here. \n" +
- "- **classname** \n" +
- " The class name of a Pulsar Source if archive is file-url-path (file://). \n" +
- "- **topicName** \n" +
- " The Pulsar topic to which data is sent. \n" +
- "- **serdeClassName** \n" +
- " The SerDe classname for the Pulsar Source. \n" +
- "- **schemaType** \n" +
- " The schema type (either a builtin schema like 'avro', 'json', etc.. or " +
- " custom Schema class name to be used to encode messages emitted from the Pulsar Source \n" +
- "- **configs** \n" +
- " Source config key/values \n" +
- "- **secrets** \n" +
- " This is a map of secretName(that is how the secret is going to be accessed in the function via context) to an object that" +
- " encapsulates how the secret is fetched by the underlying secrets provider. The type of an value here can be found by the" +
- " SecretProviderConfigurator.getSecretObjectType() method. \n" +
- "- **parallelism** \n" +
- " The parallelism factor of a Pulsar Source (i.e. the number of a Pulsar Source instances to run). \n" +
- "- **processingGuarantees** \n" +
- " The processing guarantees (aka delivery semantics) applied to the Pulsar Source. " +
- " Possible Values: [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE] \n" +
- "- **resources** \n" +
- " The size of the system resources allowed by the Pulsar Source runtime. The resources include: cpu, ram, disk. \n" +
- "- **archive** \n" +
- " The path to the NAR archive for the Pulsar Source. It also supports url-path " +
- " [http/https/file (file protocol assumes that file already exists on worker host)] " +
- " from which worker can download the package. \n" +
- "- **runtimeFlags** \n" +
- " Any flags that you want to pass to the runtime. \n",
+ value = "A JSON value presenting configuration payload of a Pulsar Source."
+ + " An example of the expected functions can be found here.\n"
+ + "- **classname**\n"
+ + " The class name of a Pulsar Source if archive is file-url-path (file://).\n"
+ + "- **topicName**\n"
+ + " The Pulsar topic to which data is sent.\n"
+ + "- **serdeClassName**\n"
+ + " The SerDe classname for the Pulsar Source.\n"
+ + "- **schemaType**\n"
+ + " The schema type (either a builtin schema like 'avro', 'json', etc.. or "
+ + " custom Schema class name to be used to"
+ + " encode messages emitted from the Pulsar Source\n"
+ + "- **configs**\n"
+ + " Source config key/values\n"
+ + "- **secrets**\n"
+ + " This is a map of secretName(that is how the secret is going"
+ + " to be accessed in the function via context) to an object that"
+ + " encapsulates how the secret is fetched by the underlying secrets provider."
+ + " The type of an value here can be found by the"
+ + " SecretProviderConfigurator.getSecretObjectType() method. \n"
+ + "- **parallelism**\n"
+ + " The parallelism factor of a Pulsar Source"
+ + " (i.e. the number of a Pulsar Source instances to run).\n"
+ + "- **processingGuarantees**\n"
+ + " The processing guarantees (aka delivery semantics) applied to the Pulsar Source. "
+ + " Possible Values: [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE]\n"
+ + "- **resources**\n"
+ + " The size of the system resources allowed by the Pulsar Source runtime."
+ + " The resources include: cpu, ram, disk.\n"
+ + "- **archive**\n"
+ + " The path to the NAR archive for the Pulsar Source. It also supports url-path "
+ + " [http/https/file (file protocol assumes that file already exists on worker host)] "
+ + " from which worker can download the package.\n"
+ + "- **runtimeFlags**\n"
+ + " Any flags that you want to pass to the runtime.\n",
examples = @Example(
value = @ExampleProperty(
mediaType = MediaType.APPLICATION_JSON,
@@ -138,7 +145,8 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
@ApiOperation(value = "Updates a Pulsar Source currently running in cluster mode")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
- @ApiResponse(code = 400, message = "Invalid request (Function already exists or Tenant, Namespace or Name is not provided, etc.)"),
+ @ApiResponse(code = 400, message =
+ "Invalid request (Function already exists or Tenant, Namespace or Name is not provided, etc.)"),
@ApiResponse(code = 401, message = "Client is not authorize to perform operation"),
@ApiResponse(code = 200, message = "Pulsar Function successfully updated"),
@ApiResponse(code = 404, message = "Not Found(The Pulsar Source doesn't exist)"),
@@ -158,35 +166,41 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String sourcePkgUrl,
@ApiParam(
- value = "A JSON value presenting configuration payload of a Pulsar Source. An example of the expected functions can be found here. \n" +
- "- **classname** \n" +
- " The class name of a Pulsar Source if archive is file-url-path (file://). \n" +
- "- **topicName** \n" +
- " The Pulsar topic to which data is sent. \n" +
- "- **serdeClassName** \n" +
- " The SerDe classname for the Pulsar Source. \n" +
- "- **schemaType** \n" +
- " The schema type (either a builtin schema like 'avro', 'json', etc.. or " +
- " custom Schema class name to be used to encode messages emitted from the Pulsar Source \n" +
- "- **configs** \n" +
- " Pulsar Source config key/values \n" +
- "- **secrets** \n" +
- " This is a map of secretName(that is how the secret is going to be accessed in the function via context) to an object that" +
- " encapsulates how the secret is fetched by the underlying secrets provider. The type of an value here can be found by the" +
- " SecretProviderConfigurator.getSecretObjectType() method. \n" +
- "- **parallelism** \n" +
- " The parallelism factor of a Pulsar Source (i.e. the number of a Pulsar Source instances to run). \n" +
- "- **processingGuarantees** \n" +
- " The processing guarantees (aka delivery semantics) applied to the Pulsar Source. " +
- " Possible Values: [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE] \n" +
- "- **resources** \n" +
- " The size of the system resources allowed by the Pulsar Source runtime. The resources include: cpu, ram, disk. \n" +
- "- **archive** \n" +
- " The path to the NAR archive for the Pulsar Source. It also supports url-path " +
- " [http/https/file (file protocol assumes that file already exists on worker host)] " +
- " from which worker can download the package. \n" +
- "- **runtimeFlags** \n" +
- " Any flags that you want to pass to the runtime. \n",
+ value = "A JSON value presenting configuration payload of a Pulsar Source."
+ + " An example of the expected functions can be found here.\n"
+ + "- **classname**\n"
+ + " The class name of a Pulsar Source if archive is file-url-path (file://).\n"
+ + "- **topicName**\n"
+ + " The Pulsar topic to which data is sent.\n"
+ + "- **serdeClassName**\n"
+ + " The SerDe classname for the Pulsar Source.\n"
+ + "- **schemaType**\n"
+ + " The schema type (either a builtin schema like 'avro', 'json', etc.. or "
+ + " custom Schema class name to be used to encode"
+ + " messages emitted from the Pulsar Source\n"
+ + "- **configs**\n"
+ + " Pulsar Source config key/values\n"
+ + "- **secrets**\n"
+ + " This is a map of secretName(that is how the secret is going to"
+ + " be accessed in the function via context) to an object that"
+ + " encapsulates how the secret is fetched by the underlying secrets provider."
+ + " The type of an value here can be found by the"
+ + " SecretProviderConfigurator.getSecretObjectType() method.\n"
+ + "- **parallelism**\n"
+ + " The parallelism factor of a Pulsar Source"
+ + " (i.e. the number of a Pulsar Source instances to run).\n"
+ + "- **processingGuarantees**\n"
+ + " The processing guarantees (aka delivery semantics) applied to the Pulsar Source. "
+ + " Possible Values: [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE]\n"
+ + "- **resources**\n"
+ + " The size of the system resources allowed by the Pulsar Source runtime."
+ + " The resources include: cpu, ram, disk.\n"
+ + "- **archive**\n"
+ + " The path to the NAR archive for the Pulsar Source. It also supports url-path "
+ + " [http/https/file (file protocol assumes that file already exists on worker host)] "
+ + " from which worker can download the package.\n"
+ + "- **runtimeFlags**\n"
+ + " Any flags that you want to pass to the runtime.\n",
examples = @Example(
value = @ExampleProperty(
mediaType = MediaType.APPLICATION_JSON,
@@ -267,14 +281,12 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
@Produces(MediaType.APPLICATION_JSON)
@Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/status")
public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceInstanceStatus(
- @ApiParam(value = "The tenant of a Pulsar Source")
- final @PathParam("tenant") String tenant,
- @ApiParam(value = "The namespace of a Pulsar Source")
- final @PathParam("namespace") String namespace,
- @ApiParam(value = "The name of a Pulsar Source")
- final @PathParam("sourceName") String sourceName,
- @ApiParam(value = "The instanceId of a Pulsar Source (if instance-id is not provided, the stats of all instances is returned).")
- final @PathParam("instanceId") String instanceId) throws IOException {
+ @ApiParam(value = "The tenant of a Pulsar Source") final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The namespace of a Pulsar Source") final @PathParam("namespace") String namespace,
+ @ApiParam(value = "The name of a Pulsar Source") final @PathParam("sourceName") String sourceName,
+ @ApiParam(value = "The instanceId of a Pulsar Source"
+ + " (if instance-id is not provided, the stats of all instances is returned).") final @PathParam(
+ "instanceId") String instanceId) throws IOException {
return source.getSourceInstanceStatus(
tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@@ -298,7 +310,8 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Source")
final @PathParam("sourceName") String sourceName) throws IOException {
- return source.getSourceStatus(tenant, namespace, sourceName, uri.getRequestUri(), clientAppId(), clientAuthData());
+ return source.getSourceStatus(tenant, namespace, sourceName, uri.getRequestUri(), clientAppId(),
+ clientAuthData());
}
@GET
@@ -336,15 +349,14 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
@Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/restart")
@Consumes(MediaType.APPLICATION_JSON)
public void restartSource(
- @ApiParam(value = "The tenant of a Pulsar Source")
- final @PathParam("tenant") String tenant,
- @ApiParam(value = "The namespace of a Pulsar Source")
- final @PathParam("namespace") String namespace,
- @ApiParam(value = "The name of a Pulsar Source")
- final @PathParam("sourceName") String sourceName,
- @ApiParam(value = "The instanceId of a Pulsar Source (if instance-id is not provided, the stats of all instances is returned).")
- final @PathParam("instanceId") String instanceId) {
- source.restartFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
+ @ApiParam(value = "The tenant of a Pulsar Source") final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The namespace of a Pulsar Source") final @PathParam("namespace") String namespace,
+ @ApiParam(value = "The name of a Pulsar Source") final @PathParam("sourceName") String sourceName,
+ @ApiParam(value = "The instanceId of a Pulsar Source"
+ + " (if instance-id is not provided, the stats of all instances is returned).") final @PathParam(
+ "instanceId") String instanceId) {
+ source.restartFunctionInstance(tenant, namespace, sourceName, instanceId,
+ uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -380,15 +392,13 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
@Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/stop")
@Consumes(MediaType.APPLICATION_JSON)
public void stopSource(
- @ApiParam(value = "The tenant of a Pulsar Source")
- final @PathParam("tenant") String tenant,
- @ApiParam(value = "The namespace of a Pulsar Source")
- final @PathParam("namespace") String namespace,
- @ApiParam(value = "The name of a Pulsar Source")
- final @PathParam("sourceName") String sourceName,
- @ApiParam(value = "The instanceId of a Pulsar Source (if instance-id is not provided, the stats of all instances is returned).")
- final @PathParam("instanceId") String instanceId) {
- source.stopFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
+ @ApiParam(value = "The tenant of a Pulsar Source") final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The namespace of a Pulsar Source") final @PathParam("namespace") String namespace,
+ @ApiParam(value = "The name of a Pulsar Source") final @PathParam("sourceName") String sourceName,
+ @ApiParam(value = "The instanceId of a Pulsar Source (if instance-id is not provided,"
+ + " the stats of all instances is returned).") final @PathParam("instanceId") String instanceId) {
+ source.stopFunctionInstance(tenant, namespace, sourceName, instanceId,
+ uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -424,15 +434,13 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
@Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/start")
@Consumes(MediaType.APPLICATION_JSON)
public void startSource(
- @ApiParam(value = "The tenant of a Pulsar Source")
- final @PathParam("tenant") String tenant,
- @ApiParam(value = "The namespace of a Pulsar Source")
- final @PathParam("namespace") String namespace,
- @ApiParam(value = "The name of a Pulsar Source")
- final @PathParam("sourceName") String sourceName,
- @ApiParam(value = "The instanceId of a Pulsar Source (if instance-id is not provided, the stats of all instances is returned).")
- final @PathParam("instanceId") String instanceId) {
- source.startFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
+ @ApiParam(value = "The tenant of a Pulsar Source") final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The namespace of a Pulsar Source") final @PathParam("namespace") String namespace,
+ @ApiParam(value = "The name of a Pulsar Source") final @PathParam("sourceName") String sourceName,
+ @ApiParam(value = "The instanceId of a Pulsar Source (if instance-id is not provided,"
+ + " the stats of all instances is returned).") final @PathParam("instanceId") String instanceId) {
+ source.startFunctionInstance(tenant, namespace, sourceName, instanceId,
+ uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
index 5b0b52d..e1c270d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
@@ -127,7 +127,8 @@ public class TenantsBase extends AdminResource {
@POST
@Path("/{tenant}")
- @ApiOperation(value = "Update the admins for a tenant.", notes = "This operation requires Pulsar super-user privileges.")
+ @ApiOperation(value = "Update the admins for a tenant.",
+ notes = "This operation requires Pulsar super-user privileges.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 409, message = "Tenant already exists"),
@@ -168,7 +169,8 @@ public class TenantsBase extends AdminResource {
if (!clustersWithActiveNamespaces.isEmpty()) {
// Throw an exception because colos being removed are having active namespaces
String msg = String.format(
- "Failed to update the tenant because active namespaces are present in colos %s. Please delete those namespaces first",
+ "Failed to update the tenant because active namespaces are present in colos %s."
+ + " Please delete those namespaces first",
clustersWithActiveNamespaces);
throw new RestException(Status.CONFLICT, msg);
}
@@ -233,8 +235,9 @@ public class TenantsBase extends AdminResource {
private void validateClusters(TenantInfo info) {
// empty cluster shouldn't be allowed
- if (info == null || info.getAllowedClusters().stream().filter(c -> !StringUtils.isBlank(c)).collect(Collectors.toSet()).isEmpty()
- || info.getAllowedClusters().stream().anyMatch(ac -> StringUtils.isBlank(ac))) {
+ if (info == null || info.getAllowedClusters().stream()
+ .filter(c -> !StringUtils.isBlank(c)).collect(Collectors.toSet()).isEmpty()
+ || info.getAllowedClusters().stream().anyMatch(ac -> StringUtils.isBlank(ac))) {
log.warn("[{}] Failed to validate due to clusters are empty", clientAppId());
throw new RestException(Status.PRECONDITION_FAILED, "Clusters can not be empty");
}