You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/12/01 04:29:48 UTC
[pulsar] branch master updated: Make broker module gradually
conform the checkstyle (#8592)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fb61a0f Make broker module gradually conform the checkstyle (#8592)
fb61a0f is described below
commit fb61a0f6e07c7bdf7b7a96885bd6f87deb2709ac
Author: Renkai <ga...@gmail.com>
AuthorDate: Tue Dec 1 12:29:26 2020 +0800
Make broker module gradually conform the checkstyle (#8592)
---
.../resources/pulsar/checkstyle-pulsar-broker.xml | 151 ---------------------
.../src/main/resources/pulsar/suppressions.xml | 6 +
pulsar-broker/pom.xml | 2 +-
.../org/apache/pulsar/PulsarBrokerStarter.java | 44 +++---
.../apache/pulsar/PulsarClusterMetadataSetup.java | 60 ++++----
.../pulsar/PulsarClusterMetadataTeardown.java | 5 +-
.../java/org/apache/pulsar/PulsarStandalone.java | 19 ++-
.../org/apache/pulsar/PulsarStandaloneStarter.java | 5 +-
.../PulsarTransactionCoordinatorMetadataSetup.java | 2 +-
.../pulsar/ZookeeperSessionExpiredHandlers.java | 5 +-
.../pulsar/broker/admin/impl/NamespacesBase.java | 2 +-
.../apache/pulsar/broker/admin/v1/Namespaces.java | 2 +-
.../org/apache/pulsar/client/api/RawReader.java | 8 +-
.../pulsar/client/impl/RawBatchConverter.java | 17 ++-
.../apache/pulsar/client/impl/RawReaderImpl.java | 10 +-
.../common/naming/NamespaceBundleFactory.java | 13 +-
.../naming/NamespaceBundleSplitAlgorithm.java | 18 +--
.../pulsar/common/naming/NamespaceBundles.java | 2 +-
.../RangeEquallyDivideBundleSplitAlgorithm.java | 4 +-
.../pulsar/utils/auth/tokens/TokensCliUtils.java | 81 ++++++-----
.../apache/pulsar/broker/admin/AdminApiTest.java | 6 +-
.../broker/namespace/NamespaceServiceTest.java | 10 +-
22 files changed, 188 insertions(+), 284 deletions(-)
diff --git a/buildtools/src/main/resources/pulsar/checkstyle-pulsar-broker.xml b/buildtools/src/main/resources/pulsar/checkstyle-pulsar-broker.xml
deleted file mode 100644
index 77f901a..0000000
--- a/buildtools/src/main/resources/pulsar/checkstyle-pulsar-broker.xml
+++ /dev/null
@@ -1,151 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<!DOCTYPE module PUBLIC
- "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
- "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
-
-<!-- This is a checkstyle configuration file. For descriptions of
-what the following rules do, please see the checkstyle configuration
-page at http://checkstyle.sourceforge.net/config.html -->
-
-<module name="Checker">
- <!-- TODO: gradually make this file same as checkstyle.xml then remove it -->
-
- <module name="SuppressWarningsFilter"/>
-
- <module name="FileTabCharacter">
- <!-- Checks that there are no tab characters in the file. -->
- </module>
-
- <module name="RegexpSingleline">
- <!-- Checks that TODOs don't have stuff in parenthesis, e.g., username. -->
- <property name="format" value="((//.*)|(\*.*))TODO\("/>
- <property name="message" value="TODO comments must not include usernames."/>
- <property name="severity" value="error"/>
- </module>
-
- <module name="RegexpSingleline">
- <property name="format" value="\s+$"/>
- <property name="message" value="Trailing whitespace"/>
- <property name="severity" value="error"/>
- </module>
-
- <module name="RegexpSingleline">
- <property name="format" value="Throwables.propagate\("/>
- <property name="message" value="Throwables.propagate is deprecated"/>
- <property name="severity" value="error"/>
- </module>
-
- <!-- Prevent *Tests.java as tools may not pick them up -->
- <module name="RegexpOnFilename">
- <property name="fileNamePattern" value=".*Tests\.java$"/>
- </module>
-
- <module name="SuppressionFilter">
- <property name="file" value="${checkstyle.suppressions.file}" default="suppressions.xml"/>
- </module>
-
- <!-- Check that every module has a package-info.java -->
- <module name="JavadocPackage"/>
-
- <!-- All Java AST specific tests live under TreeWalker module. -->
- <module name="TreeWalker">
-
- <!-- Allow use of comment to suppress javadocstyle -->
- <module name="SuppressionCommentFilter">
- <property name="offCommentFormat" value="CHECKSTYLE.OFF\: ([\w\|]+)"/>
- <property name="onCommentFormat" value="CHECKSTYLE.ON\: ([\w\|]+)"/>
- <property name="checkFormat" value="$1"/>
- </module>
-
- <module name="SuppressWarningsHolder"/>
-
- <module name="TodoComment">
- <!-- Checks that disallowed strings are not used in comments. -->
- <property name="format" value="(FIXME)|(XXX)|(@author)"/>
- </module>
-
- <!--
-
- IMPORT CHECKS
-
- -->
-
- <module name="RedundantImport">
- <!-- Checks for redundant import statements. -->
- <property name="severity" value="error"/>
- <message key="import.redundancy"
- value="Redundant import {0}."/>
- </module>
-
- <module name="ImportOrder">
- <property name="severity" value="error"/>
- <!-- This ensures that static imports go first. -->
- <property name="option" value="top"/>
- <property name="sortStaticImportsAlphabetically" value="true"/>
- <property name="tokens" value="STATIC_IMPORT, IMPORT"/>
- <message key="import.ordering"
- value="Import {0} appears after other imports that it should precede"/>
- </module>
-
- <module name="AvoidStarImport">
- <property name="severity" value="error"/>
- </module>
-
- <module name="IllegalImport">
- <property name="illegalPkgs"
- value="autovalue.shaded, avro.shaded, bk-shade, com.google.api.client.repackaged, com.google.appengine.repackaged, io.netty.util.internal"/>
- </module>
-
- <module name="RedundantModifier">
- <!-- Checks for redundant modifiers on various symbol definitions.
- See: http://checkstyle.sourceforge.net/config_modifier.html#RedundantModifier
- -->
- <property name="tokens"
- value="METHOD_DEF, VARIABLE_DEF, ANNOTATION_FIELD_DEF, INTERFACE_DEF, CLASS_DEF, ENUM_DEF"/>
- </module>
-
- <!--
- IllegalImport cannot blacklist classes, and c.g.api.client.util is used for some shaded
- code and some useful code. So we need to fall back to Regexp.
- -->
- <module name="RegexpSinglelineJava">
- <property name="format"
- value="com\.google\.api\.client\.util\.(ByteStreams|Charsets|Collections2|Joiner|Lists|Maps|Objects|Preconditions|Sets|Strings|Throwables)"/>
- </module>
-
- <!--
- Require static importing from Preconditions.
- -->
- <module name="RegexpSinglelineJava">
- <property name="format" value="^import com.google.common.base.Preconditions;$"/>
- <property name="message" value="Static import functions from Guava Preconditions"/>
- </module>
-
- <module name="UnusedImports">
- <property name="severity" value="error"/>
- <property name="processJavadoc" value="true"/>
- <message key="import.unused"
- value="Unused import: {0}."/>
- </module>
- </module>
-</module>
diff --git a/buildtools/src/main/resources/pulsar/suppressions.xml b/buildtools/src/main/resources/pulsar/suppressions.xml
index 1b22378..12eae26 100644
--- a/buildtools/src/main/resources/pulsar/suppressions.xml
+++ b/buildtools/src/main/resources/pulsar/suppressions.xml
@@ -39,6 +39,7 @@
<!-- suppress all checks in the copied code -->
<suppress checks=".*" files=".+[\\/]com[\\/]scurrilous[\\/]circe[\\/].+\.java"/>
+ <!-- TODO: gradually delete below lines to make the whole project conform the checkstyle rule -->
<suppress checks=".*" files="MLDataFormats.java"/>
<suppress checks=".*" files="BitSetRecyclable.java"/>
<suppress checks=".*" files="Schema.java"/>
@@ -47,4 +48,9 @@
<suppress checks="ConstantName" files="MessageId.java"/>
<suppress checks="MethodName" files="TopicsImpl.java"/>
<suppress checks="MemberName" files="TopicsImpl.java"/>
+ <suppress checks="ImportOrder" files="src/main/java/org/apache/pulsar/common/.*.java"/>
+ <suppress checks="ImportOrder" files="src/main/java/org/apache/pulsar/client/.*.java"/>
+ <suppress checks="ImportOrder" files="src/main/java/org/apache/pulsar/transaction/.*.java"/>
+ <suppress checks=".*" files="src/main/java/org/apache/pulsar/broker/.*.java"/>
+ <suppress checks=".*" files="src/main/java/org/apache/pulsar/compaction/.*.java"/>
</suppressions>
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 6f6f42b..fe4c7eb 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -350,7 +350,7 @@
<id>check-style</id>
<phase>verify</phase>
<configuration>
- <configLocation>../buildtools/src/main/resources/pulsar/checkstyle-pulsar-broker.xml</configLocation>
+ <configLocation>../buildtools/src/main/resources/pulsar/checkstyle.xml</configLocation>
<suppressionsLocation>../buildtools/src/main/resources/pulsar/suppressions.xml</suppressionsLocation>
<encoding>UTF-8</encoding>
<excludes>**/proto/*</excludes>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
index c2f2b1c..c5403bd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
@@ -71,22 +71,26 @@ public class PulsarBrokerStarter {
@VisibleForTesting
private static class StarterArguments {
@Parameter(names = {"-c", "--broker-conf"}, description = "Configuration file for Broker")
- private String brokerConfigFile = Paths.get("").toAbsolutePath().normalize().toString() + "/conf/broker.conf";
+ private String brokerConfigFile =
+ Paths.get("").toAbsolutePath().normalize().toString() + "/conf/broker.conf";
@Parameter(names = {"-rb", "--run-bookie"}, description = "Run Bookie together with Broker")
private boolean runBookie = false;
- @Parameter(names = {"-ra", "--run-bookie-autorecovery"}, description = "Run Bookie Autorecovery together with broker")
+ @Parameter(names = {"-ra", "--run-bookie-autorecovery"},
+ description = "Run Bookie Autorecovery together with broker")
private boolean runBookieAutoRecovery = false;
@Parameter(names = {"-bc", "--bookie-conf"}, description = "Configuration file for Bookie")
- private String bookieConfigFile = Paths.get("").toAbsolutePath().normalize().toString() + "/conf/bookkeeper.conf";
+ private String bookieConfigFile =
+ Paths.get("").toAbsolutePath().normalize().toString() + "/conf/bookkeeper.conf";
@Parameter(names = {"-rfw", "--run-functions-worker"}, description = "Run functions worker with Broker")
private boolean runFunctionsWorker = false;
@Parameter(names = {"-fwc", "--functions-worker-conf"}, description = "Configuration file for Functions Worker")
- private String fnWorkerConfigFile = Paths.get("").toAbsolutePath().normalize().toString() + "/conf/functions_worker.yml";
+ private String fnWorkerConfigFile =
+ Paths.get("").toAbsolutePath().normalize().toString() + "/conf/functions_worker.yml";
@Parameter(names = {"-h", "--help"}, description = "Show this help message")
private boolean help = false;
@@ -151,13 +155,17 @@ public class PulsarBrokerStarter {
throw new IllegalArgumentException("Max message size need smaller than jvm directMemory");
}
- if (!NamespaceBundleSplitAlgorithm.availableAlgorithms.containsAll(brokerConfig.getSupportedNamespaceBundleSplitAlgorithms())) {
- throw new IllegalArgumentException("The given supported namespace bundle split algorithm has unavailable algorithm. " +
- "Available algorithms are " + NamespaceBundleSplitAlgorithm.availableAlgorithms);
+ if (!NamespaceBundleSplitAlgorithm.AVAILABLE_ALGORITHMS.containsAll(
+ brokerConfig.getSupportedNamespaceBundleSplitAlgorithms())) {
+ throw new IllegalArgumentException(
+ "The given supported namespace bundle split algorithm has unavailable algorithm. "
+ + "Available algorithms are " + NamespaceBundleSplitAlgorithm.AVAILABLE_ALGORITHMS);
}
- if (!brokerConfig.getSupportedNamespaceBundleSplitAlgorithms().contains(brokerConfig.getDefaultNamespaceBundleSplitAlgorithm())) {
- throw new IllegalArgumentException("Supported namespace bundle split algorithms must contains the default namespace bundle split algorithm");
+ if (!brokerConfig.getSupportedNamespaceBundleSplitAlgorithms().contains(
+ brokerConfig.getDefaultNamespaceBundleSplitAlgorithm())) {
+ throw new IllegalArgumentException("Supported namespace bundle split algorithms "
+ + "must contains the default namespace bundle split algorithm");
}
// init functions worker
@@ -193,7 +201,8 @@ public class PulsarBrokerStarter {
// client in worker will use this config to authenticate with broker
workerConfig.setBrokerClientAuthenticationPlugin(brokerConfig.getBrokerClientAuthenticationPlugin());
- workerConfig.setBrokerClientAuthenticationParameters(brokerConfig.getBrokerClientAuthenticationParameters());
+ workerConfig.setBrokerClientAuthenticationParameters(
+ brokerConfig.getBrokerClientAuthenticationParameters());
// inherit super users
workerConfig.setSuperUserRoles(brokerConfig.getSuperUserRoles());
@@ -214,13 +223,13 @@ public class PulsarBrokerStarter {
// if no argument to run bookie in cmd line, read from pulsar config
if (!argsContains(args, "-rb") && !argsContains(args, "--run-bookie")) {
- checkState(starterArguments.runBookie == false,
- "runBookie should be false if has no argument specified");
+ checkState(!starterArguments.runBookie,
+ "runBookie should be false if has no argument specified");
starterArguments.runBookie = brokerConfig.isEnableRunBookieTogether();
}
if (!argsContains(args, "-ra") && !argsContains(args, "--run-bookie-autorecovery")) {
- checkState(starterArguments.runBookieAutoRecovery == false,
- "runBookieAutoRecovery should be false if has no argument specified");
+ checkState(!starterArguments.runBookieAutoRecovery,
+ "runBookieAutoRecovery should be false if has no argument specified");
starterArguments.runBookieAutoRecovery = brokerConfig.isEnableRunBookieAutoRecoveryTogether();
}
@@ -246,7 +255,8 @@ public class PulsarBrokerStarter {
if (starterArguments.runBookie) {
checkNotNull(bookieConfig, "No ServerConfiguration for Bookie");
checkNotNull(bookieStatsProvider, "No Stats Provider for Bookie");
- bookieServer = new BookieServer(bookieConfig, bookieStatsProvider.getStatsLogger(""), BookieServiceInfo.NO_INFO);
+ bookieServer = new BookieServer(
+ bookieConfig, bookieStatsProvider.getStatsLogger(""), BookieServiceInfo.NO_INFO);
} else {
bookieServer = null;
}
@@ -323,7 +333,9 @@ public class PulsarBrokerStarter {
public static void main(String[] args) throws Exception {
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS");
Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
- System.out.println(String.format("%s [%s] error Uncaught exception in thread %s: %s", dateFormat.format(new Date()), thread.getContextClassLoader(), thread.getName(), exception.getMessage()));
+ System.out.println(String.format("%s [%s] error Uncaught exception in thread %s: %s",
+ dateFormat.format(new Date()), thread.getContextClassLoader(),
+ thread.getName(), exception.getMessage()));
});
BrokerStarter starter = new BrokerStarter(args);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index 5bb6a99..b533fa3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -56,7 +56,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Setup the metadata for a new Pulsar cluster
+ * Setup the metadata for a new Pulsar cluster.
*/
public class PulsarClusterMetadataSetup {
@@ -68,16 +68,18 @@ public class PulsarClusterMetadataSetup {
"--web-service-url" }, description = "Web-service URL for new cluster", required = true)
private String clusterWebServiceUrl;
- @Parameter(names = { "-tw",
- "--web-service-url-tls" }, description = "Web-service URL for new cluster with TLS encryption", required = false)
+ @Parameter(names = {"-tw",
+ "--web-service-url-tls"},
+ description = "Web-service URL for new cluster with TLS encryption", required = false)
private String clusterWebServiceUrlTls;
@Parameter(names = { "-ub",
"--broker-service-url" }, description = "Broker-service URL for new cluster", required = false)
private String clusterBrokerServiceUrl;
- @Parameter(names = { "-tb",
- "--broker-service-url-tls" }, description = "Broker-service URL for new cluster with TLS encryption", required = false)
+ @Parameter(names = {"-tb",
+ "--broker-service-url-tls"},
+ description = "Broker-service URL for new cluster with TLS encryption", required = false)
private String clusterBrokerServiceUrlTls;
@Parameter(names = { "-zk",
@@ -89,8 +91,9 @@ public class PulsarClusterMetadataSetup {
}, description = "Local zookeeper session timeout ms")
private int zkSessionTimeoutMillis = 30000;
- @Parameter(names = { "-gzk",
- "--global-zookeeper" }, description = "Global ZooKeeper quorum connection string", required = false, hidden = true)
+ @Parameter(names = {"-gzk",
+ "--global-zookeeper"},
+ description = "Global ZooKeeper quorum connection string", required = false, hidden = true)
private String globalZookeeper;
@Parameter(names = { "-cs",
@@ -108,7 +111,8 @@ public class PulsarClusterMetadataSetup {
private int numTransactionCoordinators = 16;
@Parameter(names = {
- "--existing-bk-metadata-service-uri" }, description = "The metadata service URI of the existing BookKeeper cluster that you want to use")
+ "--existing-bk-metadata-service-uri"},
+ description = "The metadata service URI of the existing BookKeeper cluster that you want to use")
private String existingBkMetadataServiceUri;
@Parameter(names = { "-h", "--help" }, description = "Show this help message")
@@ -116,7 +120,7 @@ public class PulsarClusterMetadataSetup {
}
/**
- * a wrapper for ZkUtils.createFullPathOptimistic but ignore exception of node exists
+ * a wrapper for ZkUtils.createFullPathOptimistic but ignore exception of node exists.
*/
private static void createZkNode(ZooKeeper zkc, String path,
byte[] data, final List<ACL> acl, final CreateMode createMode)
@@ -151,7 +155,8 @@ public class PulsarClusterMetadataSetup {
}
if (arguments.configurationStore != null && arguments.globalZookeeper != null) {
- System.err.println("Configuration store argument (--configuration-store) supersedes the deprecated (--global-zookeeper) argument");
+ System.err.println("Configuration store argument (--configuration-store) "
+ + "supersedes the deprecated (--global-zookeeper) argument");
jcommander.usage();
System.exit(1);
}
@@ -184,7 +189,8 @@ public class PulsarClusterMetadataSetup {
// Format BookKeeper stream storage metadata
if (arguments.numStreamStorageContainers > 0) {
- String uriStr = arguments.existingBkMetadataServiceUri == null ? bkConf.getMetadataServiceUri() : arguments.existingBkMetadataServiceUri;
+ String uriStr = arguments.existingBkMetadataServiceUri == null
+ ? bkConf.getMetadataServiceUri() : arguments.existingBkMetadataServiceUri;
ServiceURI bkMetadataServiceUri = ServiceURI.create(uriStr);
ClusterInitializer initializer = new ZkClusterInitializer(arguments.zookeeper);
initializer.initializeCluster(bkMetadataServiceUri.getUri(), arguments.numStreamStorageContainers);
@@ -201,24 +207,25 @@ public class PulsarClusterMetadataSetup {
createZkNode(localZk, "/namespace", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
createZkNode(configStoreZk, POLICIES_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ CreateMode.PERSISTENT);
createZkNode(configStoreZk, "/admin/clusters", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ CreateMode.PERSISTENT);
ClusterData clusterData = new ClusterData(arguments.clusterWebServiceUrl, arguments.clusterWebServiceUrlTls,
arguments.clusterBrokerServiceUrl, arguments.clusterBrokerServiceUrlTls);
byte[] clusterDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(clusterData);
- createZkNode(configStoreZk,"/admin/clusters/" + arguments.cluster, clusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ createZkNode(configStoreZk, "/admin/clusters/" + arguments.cluster, clusterDataJson,
+ ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
// Create marker for "global" cluster
ClusterData globalClusterData = new ClusterData(null, null);
byte[] globalClusterDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(globalClusterData);
createZkNode(configStoreZk, "/admin/clusters/global", globalClusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ CreateMode.PERSISTENT);
// Create public tenant, whitelisted to use the this same cluster, along with other clusters
createTenantIfAbsent(configStoreZk, TopicName.PUBLIC_TENANT, arguments.cluster);
@@ -234,7 +241,8 @@ public class PulsarClusterMetadataSetup {
createNamespaceIfAbsent(configStoreZk, NamespaceName.SYSTEM_NAMESPACE, arguments.cluster);
// Create transaction coordinator assign partitioned topic
- createPartitionedTopic(configStoreZk, TopicName.TRANSACTION_COORDINATOR_ASSIGN, arguments.numTransactionCoordinators);
+ createPartitionedTopic(configStoreZk, TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+ arguments.numTransactionCoordinators);
localZk.close();
configStoreZk.close();
@@ -271,7 +279,7 @@ public class PulsarClusterMetadataSetup {
static void createNamespaceIfAbsent(ZooKeeper configStoreZk, NamespaceName namespaceName, String cluster)
throws KeeperException, InterruptedException, IOException {
- String namespacePath = POLICIES_ROOT + "/" +namespaceName.toString();
+ String namespacePath = POLICIES_ROOT + "/" + namespaceName.toString();
Policies policies;
Stat stat = configStoreZk.exists(namespacePath, false);
if (stat == null) {
@@ -299,21 +307,23 @@ public class PulsarClusterMetadataSetup {
}
}
- static void createPartitionedTopic(ZooKeeper configStoreZk, TopicName topicName, int numPartitions) throws KeeperException, InterruptedException, IOException {
+ static void createPartitionedTopic(ZooKeeper configStoreZk, TopicName topicName, int numPartitions)
+ throws KeeperException, InterruptedException, IOException {
String partitionedTopicPath = ZkAdminPaths.partitionedTopicPath(topicName);
Stat stat = configStoreZk.exists(partitionedTopicPath, false);
PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(numPartitions);
if (stat == null) {
createZkNode(
- configStoreZk,
- partitionedTopicPath,
- ObjectMapperFactory.getThreadLocal().writeValueAsBytes(metadata),
- ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT
+ configStoreZk,
+ partitionedTopicPath,
+ ObjectMapperFactory.getThreadLocal().writeValueAsBytes(metadata),
+ ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT
);
} else {
byte[] content = configStoreZk.getData(partitionedTopicPath, false, null);
- PartitionedTopicMetadata existsMeta = ObjectMapperFactory.getThreadLocal().readValue(content, PartitionedTopicMetadata.class);
+ PartitionedTopicMetadata existsMeta =
+ ObjectMapperFactory.getThreadLocal().readValue(content, PartitionedTopicMetadata.class);
// Only update z-node if the partitions should be modified
if (existsMeta.partitions < numPartitions) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
index 42cb245..80b34cb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
@@ -41,7 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Teardown the metadata for a existed Pulsar cluster
+ * Teardown the metadata for a existed Pulsar cluster.
*/
public class PulsarClusterMetadataTeardown {
@@ -87,7 +87,8 @@ public class PulsarClusterMetadataTeardown {
}
if (arguments.bkMetadataServiceUri != null) {
- BookKeeper bookKeeper = new BookKeeper(new ClientConfiguration().setMetadataServiceUri(arguments.bkMetadataServiceUri));
+ BookKeeper bookKeeper =
+ new BookKeeper(new ClientConfiguration().setMetadataServiceUri(arguments.bkMetadataServiceUri));
ZooKeeper localZk = initZk(arguments.zookeeper, arguments.zkSessionTimeoutMillis);
ManagedLedgerFactory managedLedgerFactory = new ManagedLedgerFactoryImpl(bookKeeper, localZk);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
index 49877e3..9cea534 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
@@ -76,7 +76,9 @@ public class PulsarStandalone implements AutoCloseable {
this.advertisedAddress = advertisedAddress;
}
- public void setConfig(ServiceConfiguration config) { this.config = config; }
+ public void setConfig(ServiceConfiguration config) {
+ this.config = config;
+ }
public void setFnWorkerService(WorkerService fnWorkerService) {
this.fnWorkerService = fnWorkerService;
@@ -225,7 +227,8 @@ public class PulsarStandalone implements AutoCloseable {
private boolean noFunctionsWorker = false;
@Parameter(names = {"-fwc", "--functions-worker-conf"}, description = "Configuration file for Functions Worker")
- private String fnWorkerConfigFile = Paths.get("").toAbsolutePath().normalize().toString() + "/conf/functions_worker.yml";
+ private String fnWorkerConfigFile =
+ Paths.get("").toAbsolutePath().normalize().toString() + "/conf/functions_worker.yml";
@Parameter(names = {"-nss", "--no-stream-storage"}, description = "Disable stream storage")
private boolean noStreamStorage = false;
@@ -330,9 +333,12 @@ public class PulsarStandalone implements AutoCloseable {
String.format("http://%s:%d", config.getAdvertisedAddress(), config.getWebServicePort().get()));
String brokerServiceUrl = String.format("pulsar://%s:%d", config.getAdvertisedAddress(),
config.getBrokerServicePort().get());
- admin = PulsarAdmin.builder().serviceHttpUrl(webServiceUrl.toString()).authentication(
- config.getBrokerClientAuthenticationPlugin(), config.getBrokerClientAuthenticationParameters()).build();
- ClusterData clusterData = new ClusterData(webServiceUrl.toString(), null, brokerServiceUrl, null);
+ admin = PulsarAdmin.builder().serviceHttpUrl(
+ webServiceUrl.toString()).authentication(
+ config.getBrokerClientAuthenticationPlugin(),
+ config.getBrokerClientAuthenticationParameters()).build();
+ ClusterData clusterData =
+ new ClusterData(webServiceUrl.toString(), null, brokerServiceUrl, null);
createSampleNameSpace(clusterData, cluster);
} else {
URL webServiceUrlTls = new URL(
@@ -379,7 +385,8 @@ public class PulsarStandalone implements AutoCloseable {
}
if (!admin.namespaces().getNamespaces(publicTenant).contains(defaultNamespace)) {
admin.namespaces().createNamespace(defaultNamespace);
- admin.namespaces().setNamespaceReplicationClusters(defaultNamespace, Sets.newHashSet(config.getClusterName()));
+ admin.namespaces().setNamespaceReplicationClusters(
+ defaultNamespace, Sets.newHashSet(config.getClusterName()));
}
} catch (PulsarAdminException e) {
log.info(e.getMessage(), e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
index 36b462c..27c9a65 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
@@ -53,7 +53,8 @@ public class PulsarStandaloneStarter extends PulsarStandalone {
return;
}
- this.config = PulsarConfigurationLoader.create((new FileInputStream(this.getConfigFile())), ServiceConfiguration.class);
+ this.config = PulsarConfigurationLoader.create(
+ (new FileInputStream(this.getConfigFile())), ServiceConfiguration.class);
String zkServers = "127.0.0.1";
@@ -70,7 +71,7 @@ public class PulsarStandaloneStarter extends PulsarStandalone {
// Set ZK server's host to localhost
// Priority: args > conf > default
- if (argsContains(args,"--zookeeper-port")) {
+ if (argsContains(args, "--zookeeper-port")) {
config.setZookeeperServers(zkServers + ":" + this.getZkPort());
config.setConfigurationStoreServers(zkServers + ":" + this.getZkPort());
} else {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java
index c77d0c9..f105b9b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java
@@ -26,7 +26,7 @@ import org.apache.zookeeper.ZooKeeper;
/**
* Setup the transaction coordinator metadata for a cluster, the setup will create pulsar/system namespace and create
- * partitioned topic for transaction coordinator assign
+ * partitioned topic for transaction coordinator assign.
*/
public class PulsarTransactionCoordinatorMetadataSetup {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/ZookeeperSessionExpiredHandlers.java b/pulsar-broker/src/main/java/org/apache/pulsar/ZookeeperSessionExpiredHandlers.java
index e654193..dddb9f6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/ZookeeperSessionExpiredHandlers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/ZookeeperSessionExpiredHandlers.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
import org.apache.pulsar.zookeeper.ZookeeperSessionExpiredHandler;
/**
- * Handlers for broker service to handle Zookeeper session expired
+ * Handlers for broker service to handle Zookeeper session expired.
*/
public class ZookeeperSessionExpiredHandlers {
@@ -35,7 +35,8 @@ public class ZookeeperSessionExpiredHandlers {
return new ShutDownWhenSessionExpired(shutdownService);
}
- public static ZookeeperSessionExpiredHandler reconnectWhenZookeeperSessionExpired(PulsarService pulsarService, ShutdownService shutdownService) {
+ public static ZookeeperSessionExpiredHandler reconnectWhenZookeeperSessionExpired(
+ PulsarService pulsarService, ShutdownService shutdownService) {
return new ReconnectWhenSessionExpired(pulsarService, shutdownService);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index ff94374..ec4e028 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1374,7 +1374,7 @@ public abstract class NamespacesBase extends AdminResource {
algorithm = NamespaceBundleSplitAlgorithm.of(pulsar().getConfig().getDefaultNamespaceBundleSplitAlgorithm());
}
if (algorithm == null) {
- algorithm = NamespaceBundleSplitAlgorithm.rangeEquallyDivide;
+ algorithm = NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO;
}
return algorithm;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 743e3bf..e17399b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -567,7 +567,7 @@ public class Namespaces extends NamespacesBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("unload") @DefaultValue("false") boolean unload) {
validateNamespaceName(property, cluster, namespace);
- internalSplitNamespaceBundle(bundleRange, authoritative, unload, NamespaceBundleSplitAlgorithm.rangeEquallyDivideName);
+ internalSplitNamespaceBundle(bundleRange, authoritative, unload, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME);
}
@POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
index e142924..415c3dc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
@@ -38,7 +38,7 @@ public interface RawReader {
}
/**
- * Get the topic for the reader
+ * Get the topic for the reader.
*
* @return topic for the reader
*/
@@ -69,13 +69,13 @@ public interface RawReader {
* with the individual acknowledgement, so later acknowledgements will overwrite all
* properties from previous acknowledgements.
*
- * @param messageId to cumulatively acknowledge to
+ * @param messageId to cumulatively acknowledge to
* @param properties a map of properties which will be stored with the acknowledgement
*/
- CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Map<String,Long> properties);
+ CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Map<String, Long> properties);
/**
- * Get the last message id available immediately available for reading
+ * Get the last message id available immediately available for reading.
*/
CompletableFuture<MessageId> getLastMessageIdAsync();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index 7447419..17306ff 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -75,7 +75,9 @@ public class RawBatchConverter {
msg.getMessageIdData().getPartition(),
i);
if (!singleMessageMetadataBuilder.getCompactedOut()) {
- idsAndKeysAndSize.add(ImmutableTriple.of(id, singleMessageMetadataBuilder.getPartitionKey(), singleMessageMetadataBuilder.getPayloadSize()));
+ idsAndKeysAndSize.add(ImmutableTriple.of(
+ id, singleMessageMetadataBuilder.getPartitionKey(),
+ singleMessageMetadataBuilder.getPayloadSize()));
}
singleMessageMetadataBuilder.recycle();
singleMessagePayload.release();
@@ -108,16 +110,17 @@ public class RawBatchConverter {
int batchSize = metadata.getNumMessagesInBatch();
int messagesRetained = 0;
- SingleMessageMetadata.Builder emptyMetadataBuilder = SingleMessageMetadata.newBuilder().setCompactedOut(true);
+ SingleMessageMetadata.Builder emptyMetadataBuilder =
+ SingleMessageMetadata.newBuilder().setCompactedOut(true);
for (int i = 0; i < batchSize; i++) {
SingleMessageMetadata.Builder singleMessageMetadataBuilder = SingleMessageMetadata.newBuilder();
ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
- singleMessageMetadataBuilder,
- 0, batchSize);
+ singleMessageMetadataBuilder,
+ 0, batchSize);
MessageId id = new BatchMessageIdImpl(msg.getMessageIdData().getLedgerId(),
- msg.getMessageIdData().getEntryId(),
- msg.getMessageIdData().getPartition(),
- i);
+ msg.getMessageIdData().getEntryId(),
+ msg.getMessageIdData().getPartition(),
+ i);
if (!singleMessageMetadataBuilder.hasPartitionKey()) {
messagesRetained++;
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 3863d4d..6cdf62e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -83,7 +83,7 @@ public class RawReaderImpl implements RawReader {
}
@Override
- public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Map<String,Long> properties) {
+ public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Map<String, Long> properties) {
return consumer.doAcknowledgeWithTxn(messageId, AckType.Cumulative, properties, null);
}
@@ -140,7 +140,8 @@ public class RawReaderImpl implements RawReader {
} else {
int numMsg;
try {
- MessageMetadata msgMetadata = Commands.parseMessageMetadata(messageAndCnx.msg.getHeadersAndPayload());
+ MessageMetadata msgMetadata =
+ Commands.parseMessageMetadata(messageAndCnx.msg.getHeadersAndPayload());
numMsg = msgMetadata.getNumMessagesInBatch();
msgMetadata.recycle();
} catch (Throwable t) {
@@ -201,10 +202,11 @@ public class RawReaderImpl implements RawReader {
}
@Override
- void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ackSet, ByteBuf headersAndPayload, ClientCnx cnx) {
+ void messageReceived(MessageIdData messageId, int redeliveryCount,
+ List<Long> ackSet, ByteBuf headersAndPayload, ClientCnx cnx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Received raw message: {}/{}/{}", topic, subscription,
- messageId.getEntryId(), messageId.getLedgerId(), messageId.getPartition());
+ messageId.getEntryId(), messageId.getLedgerId(), messageId.getPartition());
}
incomingRawMessages.add(
new RawMessageAndCnx(new RawMessageImpl(messageId, headersAndPayload), cnx));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
index 26caee0..f1a127e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
@@ -132,7 +132,7 @@ public class NamespaceBundleFactory implements ZooKeeperCacheListener<LocalPolic
}
/**
- * checks if the local broker is the owner of the namespace bundle
+ * checks if the local broker is the owner of the namespace bundle.
*
* @param nsBundle
* @return
@@ -223,11 +223,13 @@ public class NamespaceBundleFactory implements ZooKeeperCacheListener<LocalPolic
* @return List of split {@link NamespaceBundle} and {@link NamespaceBundles} that contains final bundles including
* split bundles for a given namespace
*/
- public Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundle targetBundle, int numBundles, Long splitBoundary) {
+ public Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(
+ NamespaceBundle targetBundle, int numBundles, Long splitBoundary) {
checkArgument(canSplitBundle(targetBundle), "%s bundle can't be split further", targetBundle);
if (splitBoundary != null) {
- checkArgument(splitBoundary > targetBundle.getLowerEndpoint() && splitBoundary < targetBundle.getUpperEndpoint(),
- "The given fixed key must between the key range of the %s bundle", targetBundle);
+ checkArgument(splitBoundary > targetBundle.getLowerEndpoint()
+ && splitBoundary < targetBundle.getUpperEndpoint(),
+ "The given fixed key must between the key range of the %s bundle", targetBundle);
numBundles = 2;
}
checkNotNull(targetBundle, "can't split null bundle");
@@ -261,7 +263,8 @@ public class NamespaceBundleFactory implements ZooKeeperCacheListener<LocalPolic
partitions[pos] = sourceBundle.partitions[lastIndex];
if (splitPartition != -1) {
// keep version of sourceBundle
- NamespaceBundles splittedNsBundles = new NamespaceBundles(nsname, partitions, this, sourceBundle.getVersion());
+ NamespaceBundles splittedNsBundles =
+ new NamespaceBundles(nsname, partitions, this, sourceBundle.getVersion());
List<NamespaceBundle> splittedBundles = splittedNsBundles.getBundles().subList(splitPartition,
(splitPartition + numBundles));
return new ImmutablePair<NamespaceBundles, List<NamespaceBundle>>(splittedNsBundles, splittedBundles);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java
index c7647f5..1dc19f3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java
@@ -28,23 +28,23 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
*/
public interface NamespaceBundleSplitAlgorithm {
- String rangeEquallyDivideName = "range_equally_divide";
- String topicCountEquallyDivideName = "topic_count_equally_divide";
+ String RANGE_EQUALLY_DIVIDE_NAME = "range_equally_divide";
+ String TOPIC_COUNT_EQUALLY_DIVIDE = "topic_count_equally_divide";
- List<String> availableAlgorithms = Lists.newArrayList(rangeEquallyDivideName, topicCountEquallyDivideName);
+ List<String> AVAILABLE_ALGORITHMS = Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME, TOPIC_COUNT_EQUALLY_DIVIDE);
- NamespaceBundleSplitAlgorithm rangeEquallyDivide = new RangeEquallyDivideBundleSplitAlgorithm();
- NamespaceBundleSplitAlgorithm topicCountEquallyDivide = new TopicCountEquallyDivideBundleSplitAlgorithm();
+ NamespaceBundleSplitAlgorithm RANGE_EQUALLY_DIVIDE_ALGO = new RangeEquallyDivideBundleSplitAlgorithm();
+ NamespaceBundleSplitAlgorithm TOPIC_COUNT_EQUALLY_DIVIDE_ALGO = new TopicCountEquallyDivideBundleSplitAlgorithm();
static NamespaceBundleSplitAlgorithm of(String algorithmName) {
if (algorithmName == null) {
return null;
}
switch (algorithmName) {
- case rangeEquallyDivideName:
- return rangeEquallyDivide;
- case topicCountEquallyDivideName:
- return topicCountEquallyDivide;
+ case RANGE_EQUALLY_DIVIDE_NAME:
+ return RANGE_EQUALLY_DIVIDE_ALGO;
+ case TOPIC_COUNT_EQUALLY_DIVIDE:
+ return TOPIC_COUNT_EQUALLY_DIVIDE_ALGO;
default:
return null;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
index af8c807..75e235f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
@@ -79,7 +79,7 @@ public class NamespaceBundles {
lowerBound = upperBound;
}
} else {
- this.partitions = new long[] { 0L };
+ this.partitions = new long[]{0L};
bundles.add(fullBundle);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithm.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithm.java
index 855193c..e0b2347 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithm.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithm.java
@@ -28,7 +28,7 @@ public class RangeEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSp
@Override
public CompletableFuture<Long> getSplitBoundary(NamespaceService service, NamespaceBundle bundle) {
- return CompletableFuture.completedFuture(bundle.getLowerEndpoint() +
- (bundle.getUpperEndpoint() - bundle.getLowerEndpoint()) / 2);
+ return CompletableFuture.completedFuture(bundle.getLowerEndpoint()
+ + (bundle.getUpperEndpoint() - bundle.getLowerEndpoint()) / 2);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java
index 2aef18e..ee2c384 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java
@@ -49,22 +49,22 @@ import org.apache.pulsar.common.util.RelativeTimeUtil;
public class TokensCliUtils {
public static class Arguments {
- @Parameter(names = { "-h", "--help" }, description = "Show this help message")
+ @Parameter(names = {"-h", "--help"}, description = "Show this help message")
private boolean help = false;
}
@Parameters(commandDescription = "Create a new secret key")
public static class CommandCreateSecretKey {
- @Parameter(names = { "-a",
- "--signature-algorithm" }, description = "The signature algorithm for the new secret key.")
+ @Parameter(names = {"-a",
+ "--signature-algorithm"}, description = "The signature algorithm for the new secret key.")
SignatureAlgorithm algorithm = SignatureAlgorithm.HS256;
- @Parameter(names = { "-o",
- "--output" }, description = "Write the secret key to a file instead of stdout")
+ @Parameter(names = {"-o",
+ "--output"}, description = "Write the secret key to a file instead of stdout")
String outputFile;
@Parameter(names = {
- "-b", "--base64" }, description = "Encode the key in base64")
+ "-b", "--base64"}, description = "Encode the key in base64")
boolean base64 = false;
public void run() throws IOException {
@@ -85,15 +85,15 @@ public class TokensCliUtils {
@Parameters(commandDescription = "Create a new or pair of keys public/private")
public static class CommandCreateKeyPair {
- @Parameter(names = { "-a",
- "--signature-algorithm" }, description = "The signature algorithm for the new key pair.")
+ @Parameter(names = {"-a",
+ "--signature-algorithm"}, description = "The signature algorithm for the new key pair.")
SignatureAlgorithm algorithm = SignatureAlgorithm.RS256;
@Parameter(names = {
- "--output-private-key" }, description = "File where to write the private key", required = true)
+ "--output-private-key"}, description = "File where to write the private key", required = true)
String privateKeyFile;
@Parameter(names = {
- "--output-public-key" }, description = "File where to write the public key", required = true)
+ "--output-public-key"}, description = "File where to write the public key", required = true)
String publicKeyFile;
public void run() throws IOException {
@@ -106,24 +106,29 @@ public class TokensCliUtils {
@Parameters(commandDescription = "Create a new token")
public static class CommandCreateToken {
- @Parameter(names = { "-a",
- "--signature-algorithm" }, description = "The signature algorithm for the new key pair.")
+ @Parameter(names = {"-a",
+ "--signature-algorithm"}, description = "The signature algorithm for the new key pair.")
SignatureAlgorithm algorithm = SignatureAlgorithm.RS256;
- @Parameter(names = { "-s",
- "--subject" }, description = "Specify the 'subject' or 'principal' associate with this token", required = true)
+ @Parameter(names = {"-s",
+ "--subject"},
+ description = "Specify the 'subject' or 'principal' associate with this token", required = true)
private String subject;
- @Parameter(names = { "-e",
- "--expiry-time" }, description = "Relative expiry time for the token (eg: 1h, 3d, 10y). (m=minutes) Default: no expiration")
+ @Parameter(names = {"-e",
+ "--expiry-time"},
+ description = "Relative expiry time for the token (eg: 1h, 3d, 10y)."
+ + " (m=minutes) Default: no expiration")
private String expiryTime;
- @Parameter(names = { "-sk",
- "--secret-key" }, description = "Pass the secret key for signing the token. This can either be: data:, file:, etc..")
+ @Parameter(names = {"-sk",
+ "--secret-key"},
+ description = "Pass the secret key for signing the token. This can either be: data:, file:, etc..")
private String secretKey;
- @Parameter(names = { "-pk",
- "--private-key" }, description = "Pass the private key for signing the token. This can either be: data:, file:, etc..")
+ @Parameter(names = {"-pk",
+ "--private-key"},
+ description = "Pass the private key for signing the token. This can either be: data:, file:, etc..")
private String privateKey;
public void run() throws Exception {
@@ -165,12 +170,12 @@ public class TokensCliUtils {
@Parameter(description = "The token string", arity = 1)
private java.util.List<String> args;
- @Parameter(names = { "-i",
- "--stdin" }, description = "Read token from standard input")
+ @Parameter(names = {"-i",
+ "--stdin"}, description = "Read token from standard input")
private Boolean stdin = false;
- @Parameter(names = { "-f",
- "--token-file" }, description = "Read token from a file")
+ @Parameter(names = {"-f",
+ "--token-file"}, description = "Read token from a file")
private String tokenFile;
public void run() throws Exception {
@@ -187,7 +192,8 @@ public class TokensCliUtils {
token = System.getenv("TOKEN");
} else {
System.err.println(
- "Token needs to be either passed as an argument or through `--stdin`, `--token-file` or by the `TOKEN` environment variable");
+ "Token needs to be either passed as an argument or through `--stdin`,"
+ + " `--token-file` or by the `TOKEN` environment variable");
System.exit(1);
return;
}
@@ -202,27 +208,29 @@ public class TokensCliUtils {
@Parameters(commandDescription = "Validate a token against a key")
public static class CommandValidateToken {
- @Parameter(names = { "-a",
- "--signature-algorithm" }, description = "The signature algorithm for the key pair if using public key.")
+ @Parameter(names = {"-a",
+ "--signature-algorithm"}, description = "The signature algorithm for the key pair if using public key.")
SignatureAlgorithm algorithm = SignatureAlgorithm.RS256;
@Parameter(description = "The token string", arity = 1)
private java.util.List<String> args;
- @Parameter(names = { "-i",
- "--stdin" }, description = "Read token from standard input")
+ @Parameter(names = {"-i",
+ "--stdin"}, description = "Read token from standard input")
private Boolean stdin = false;
- @Parameter(names = { "-f",
- "--token-file" }, description = "Read token from a file")
+ @Parameter(names = {"-f",
+ "--token-file"}, description = "Read token from a file")
private String tokenFile;
- @Parameter(names = { "-sk",
- "--secret-key" }, description = "Pass the secret key for validating the token. This can either be: data:, file:, etc..")
+ @Parameter(names = {"-sk",
+ "--secret-key"},
+ description = "Pass the secret key for validating the token. This can either be: data:, file:, etc..")
private String secretKey;
- @Parameter(names = { "-pk",
- "--public-key" }, description = "Pass the public key for validating the token. This can either be: data:, file:, etc..")
+ @Parameter(names = {"-pk",
+ "--public-key"},
+ description = "Pass the public key for validating the token. This can either be: data:, file:, etc..")
private String publicKey;
public void run() throws Exception {
@@ -249,7 +257,8 @@ public class TokensCliUtils {
token = System.getenv("TOKEN");
} else {
System.err.println(
- "Token needs to be either passed as an argument or through `--stdin`, `--token-file` or by the `TOKEN` environment variable");
+ "Token needs to be either passed as an argument or through `--stdin`,"
+ + " `--token-file` or by the `TOKEN` environment variable");
System.exit(1);
return;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index ad9939c..302f327 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -1132,7 +1132,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
try {
admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true,
- NamespaceBundleSplitAlgorithm.topicCountEquallyDivideName);
+ NamespaceBundleSplitAlgorithm.TOPIC_COUNT_EQUALLY_DIVIDE);
} catch (Exception e) {
fail("split bundle shouldn't have thrown exception");
}
@@ -1161,7 +1161,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
@Test
public void testNamespaceSplitBundleWithDefaultTopicCountEquallyDivideAlgorithm() throws Exception {
- conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.topicCountEquallyDivideName);
+ conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.TOPIC_COUNT_EQUALLY_DIVIDE);
// Force to create a topic
final String namespace = "prop-xyz/ns1";
List<String> topicNames = Lists.newArrayList(
@@ -1195,7 +1195,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
assertNotEquals(bundles.getBundles().get(i).toString(), splitRange[i]);
}
producers.forEach(Producer::closeAsync);
- conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.rangeEquallyDivideName);
+ conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME);
}
@Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index b819601..0fb9b4c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -114,7 +114,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
NamespaceBundle originalBundle = bundles.findBundle(topicName);
// Split bundle and take ownership of split bundles
- CompletableFuture<Void> result = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.rangeEquallyDivide);
+ CompletableFuture<Void> result = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
try {
result.get();
@@ -194,7 +194,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
assertNotNull(list);
// Split bundle and take ownership of split bundles
- CompletableFuture<Void> result = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.rangeEquallyDivide);
+ CompletableFuture<Void> result = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
try {
result.get();
} catch (Exception e) {
@@ -409,7 +409,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
NamespaceBundle originalBundle = bundles.findBundle(topicName);
// Split bundle and take ownership of split bundles
- CompletableFuture<Void> result = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.rangeEquallyDivide);
+ CompletableFuture<Void> result = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
try {
result.get();
@@ -474,7 +474,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
NamespaceBundle originalBundle = bundles.findBundle(topicName);
- CompletableFuture<Void> result1 = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.rangeEquallyDivide);
+ CompletableFuture<Void> result1 = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
try {
result1.get();
} catch (Exception e) {
@@ -493,7 +493,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
}
});
- CompletableFuture<Void> result2 = namespaceService.splitAndOwnBundle(splittedBundle, true, NamespaceBundleSplitAlgorithm.rangeEquallyDivide);
+ CompletableFuture<Void> result2 = namespaceService.splitAndOwnBundle(splittedBundle, true, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
try {
result2.get();
} catch (Exception e) {