You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/08/30 14:10:26 UTC
[pulsar] branch master updated: Add checkstyle validation and fix
style violations in the common module (#4989)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d9c0a10 Add checkstyle validation and fix style violations in the common module (#4989)
d9c0a10 is described below
commit d9c0a1054310e9428007d016895d4174b0d20f89
Author: Sergii Zhevzhyk <vz...@users.noreply.github.com>
AuthorDate: Fri Aug 30 16:10:21 2019 +0200
Add checkstyle validation and fix style violations in the common module (#4989)
* Add checkstyle validation and fix style violations.
* Add the first batch of javadoc comments.
* Add javadoc comments in the pulsar common module
* Substitute the suppression warnings with javadoc comments in the pulsar common module
---
pulsar-common/pom.xml | 20 ++++
.../client/api/url/DataURLStreamHandler.java | 6 ++
.../api/url/PulsarURLStreamHandlerFactory.java | 4 +
.../java/org/apache/pulsar/client/api/url/URL.java | 3 +
.../api/url/package-info.java} | 9 +-
.../common/allocator/PulsarByteBufAllocator.java | 3 +
.../package-info.java} | 9 +-
.../pulsar/common/api/raw/MessageParser.java | 20 ++--
.../apache/pulsar/common/api/raw/RawMessage.java | 18 ++--
.../apache/pulsar/common/api/raw/RawMessageId.java | 3 +
.../common/api/raw/ReferenceCountedObject.java | 5 +
.../raw/{RawMessageId.java => package-info.java} | 9 +-
.../common/compression/CompressionCodec.java | 9 +-
.../common/compression/CompressionCodecLZ4.java | 8 +-
.../common/compression/CompressionCodecNone.java | 6 +-
.../compression/CompressionCodecProvider.java | 9 ++
.../common/compression/CompressionCodecSnappy.java | 7 +-
.../common/compression/CompressionCodecZLib.java | 2 +-
.../common/compression/CompressionCodecZstd.java | 2 +-
.../package-info.java} | 9 +-
.../common/conf/InternalConfigurationData.java | 3 +
.../RawMessageId.java => conf/package-info.java} | 9 +-
.../pulsar/common/functions/ConsumerConfig.java | 9 +-
.../pulsar/common/functions/FunctionConfig.java | 25 ++++-
.../pulsar/common/functions/FunctionState.java | 15 ++-
.../apache/pulsar/common/functions/Resources.java | 18 +++-
.../pulsar/common/functions/UpdateOptions.java | 4 +-
.../org/apache/pulsar/common/functions/Utils.java | 15 +--
.../pulsar/common/functions/WindowConfig.java | 8 +-
.../apache/pulsar/common/functions/WorkerInfo.java | 5 +-
.../package-info.java} | 9 +-
.../pulsar/common/io/ConnectorDefinition.java | 15 +--
.../org/apache/pulsar/common/io/SinkConfig.java | 15 ++-
.../org/apache/pulsar/common/io/SourceConfig.java | 17 +++-
.../raw/RawMessageId.java => io/package-info.java} | 9 +-
.../pulsar/common/lookup/data/LookupData.java | 23 +++--
.../data/package-info.java} | 9 +-
.../org/apache/pulsar/common/naming/Constants.java | 3 +
.../org/apache/pulsar/common/naming/Metadata.java | 3 +
.../apache/pulsar/common/naming/NamespaceName.java | 12 ++-
.../apache/pulsar/common/naming/ServiceUnitId.java | 13 ++-
.../apache/pulsar/common/naming/TopicDomain.java | 3 +
.../org/apache/pulsar/common/naming/TopicName.java | 48 ++++-----
.../RawMessageId.java => naming/package-info.java} | 9 +-
.../org/apache/pulsar/common/nar/FileUtils.java | 10 +-
.../apache/pulsar/common/nar/NarClassLoader.java | 8 +-
.../org/apache/pulsar/common/nar/NarUnpacker.java | 5 +-
.../RawMessageId.java => nar/package-info.java} | 9 +-
.../RawMessageId.java => net/package-info.java} | 9 +-
.../common/partition/PartitionedTopicMetadata.java | 3 +
.../package-info.java} | 9 +-
.../pulsar/common/policies/AutoFailoverPolicy.java | 7 +-
.../common/policies/NamespaceIsolationPolicy.java | 23 +++--
.../pulsar/common/policies/data/AuthAction.java | 8 +-
.../pulsar/common/policies/data/AuthPolicies.java | 9 +-
.../policies/data/AutoFailoverPolicyData.java | 8 +-
.../policies/data/AutoFailoverPolicyType.java | 7 +-
.../pulsar/common/policies/data/BacklogQuota.java | 21 ++--
.../policies/data/BookieAffinityGroupData.java | 3 +
.../pulsar/common/policies/data/BookieInfo.java | 3 +
.../policies/data/BookiesRackConfiguration.java | 5 +-
.../common/policies/data/BrokerAssignment.java | 3 +
.../data/BrokerNamespaceIsolationData.java | 6 +-
.../pulsar/common/policies/data/BrokerStatus.java | 3 +
.../pulsar/common/policies/data/BundlesData.java | 6 +-
.../pulsar/common/policies/data/ClusterData.java | 6 +-
.../pulsar/common/policies/data/ConsumerStats.java | 31 +++---
.../pulsar/common/policies/data/DispatchRate.java | 6 +-
.../pulsar/common/policies/data/ErrorData.java | 3 +
.../common/policies/data/ExceptionInformation.java | 3 +
.../pulsar/common/policies/data/FailureDomain.java | 3 +
.../pulsar/common/policies/data/FunctionStats.java | 110 ++++++++++++---------
.../common/policies/data/FunctionStatus.java | 14 ++-
.../pulsar/common/policies/data/LocalPolicies.java | 3 +
.../policies/data/NamespaceIsolationData.java | 7 +-
.../policies/data/NamespaceOwnershipStatus.java | 6 ++
.../policies/data/NonPersistentPublisherStats.java | 3 +-
.../data/NonPersistentReplicatorStats.java | 5 +-
.../data/NonPersistentSubscriptionStats.java | 5 +-
.../policies/data/NonPersistentTopicStats.java | 18 ++--
.../data/PartitionedTopicInternalStats.java | 7 +-
.../policies/data/PartitionedTopicStats.java | 7 +-
.../common/policies/data/PersistencePolicies.java | 6 +-
.../policies/data/PersistentOfflineTopicStats.java | 29 +++---
.../data/PersistentTopicInternalStats.java | 12 ++-
.../common/policies/data/PersistentTopicStats.java | 2 +-
.../pulsar/common/policies/data/Policies.java | 28 +++++-
.../common/policies/data/PublisherStats.java | 25 ++---
.../common/policies/data/ReplicatorStats.java | 27 ++---
.../pulsar/common/policies/data/ResourceQuota.java | 54 +++++-----
.../common/policies/data/RetentionPolicies.java | 1 +
.../SchemaAutoUpdateCompatibilityStrategy.java | 2 +-
.../pulsar/common/policies/data/SinkStatus.java | 14 ++-
.../pulsar/common/policies/data/SourceStatus.java | 14 ++-
.../pulsar/common/policies/data/SubscribeRate.java | 3 +
.../common/policies/data/SubscriptionAuthMode.java | 6 +-
.../common/policies/data/SubscriptionStats.java | 29 +++---
.../pulsar/common/policies/data/TenantInfo.java | 7 +-
.../pulsar/common/policies/data/TopicStats.java | 28 +++---
.../policies/data/WorkerFunctionInstanceStats.java | 8 +-
.../data/package-info.java} | 9 +-
.../policies/impl/AutoFailoverPolicyFactory.java | 3 +
.../common/policies/impl/MinAvailablePolicy.java | 9 +-
.../policies/impl/NamespaceIsolationPolicies.java | 39 ++++----
.../impl/NamespaceIsolationPolicyImpl.java | 25 ++---
.../impl/package-info.java} | 9 +-
.../package-info.java} | 9 +-
.../apache/pulsar/common/protocol/ByteBufPair.java | 8 +-
.../pulsar/common/protocol/CommandUtils.java | 6 +-
.../apache/pulsar/common/protocol/Commands.java | 50 ++++++----
.../org/apache/pulsar/common/protocol/Markers.java | 11 +--
.../pulsar/common/protocol/PulsarDecoder.java | 7 +-
.../pulsar/common/protocol/PulsarHandler.java | 9 +-
.../package-info.java} | 9 +-
.../common/protocol/schema/BytesSchemaVersion.java | 22 +++--
.../protocol/schema/DeleteSchemaResponse.java | 3 +
.../common/protocol/schema/EmptyVersion.java | 3 +
.../schema/GetAllVersionsSchemaResponse.java | 6 +-
.../common/protocol/schema/GetSchemaResponse.java | 3 +
.../protocol/schema/IsCompatibilityResponse.java | 3 +
.../common/protocol/schema/LatestVersion.java | 3 +
.../protocol/schema/LongSchemaVersionResponse.java | 3 +
.../common/protocol/schema/PostSchemaPayload.java | 3 +
.../common/protocol/schema/PostSchemaResponse.java | 3 +
.../pulsar/common/protocol/schema/SchemaData.java | 5 +-
.../common/protocol/schema/SchemaInfoUtil.java | 7 +-
.../common/protocol/schema/SchemaVersion.java | 3 +
.../schema/package-info.java} | 9 +-
.../common/sasl/JAASCredentialsContainer.java | 10 +-
.../apache/pulsar/common/sasl/KerberosName.java | 51 +++++-----
.../pulsar/common/sasl/TGTRefreshThread.java | 5 +-
.../RawMessageId.java => sasl/package-info.java} | 9 +-
.../apache/pulsar/common/stats/AllocatorStats.java | 15 +++
.../common/stats/JvmDefaultGCMetricsLogger.java | 9 +-
.../pulsar/common/stats/JvmG1GCMetricsLogger.java | 3 +
.../pulsar/common/stats/JvmGCMetricsLogger.java | 6 +-
.../org/apache/pulsar/common/stats/JvmMetrics.java | 12 +--
.../org/apache/pulsar/common/stats/Metrics.java | 19 ++--
.../RawMessageId.java => stats/package-info.java} | 9 +-
.../common/util/ClientSslContextRefresher.java | 5 +-
.../apache/pulsar/common/util/DateFormatter.java | 2 +-
.../common/util/DefaultSslContextBuilder.java | 1 +
.../org/apache/pulsar/common/util/FieldParser.java | 56 +++++------
.../common/util/FileModifiedTimeUpdater.java | 7 +-
.../org/apache/pulsar/common/util/FutureUtil.java | 5 +-
.../java/org/apache/pulsar/common/util/Hash.java | 7 +-
.../apache/pulsar/common/util/KeyStoreHolder.java | 5 +
.../apache/pulsar/common/util/Murmur3_32Hash.java | 7 +-
.../util/NamespaceBundleStatsComparator.java | 1 -
.../pulsar/common/util/NettySslContextBuilder.java | 10 +-
.../apache/pulsar/common/util/NumberFormat.java | 3 +
.../pulsar/common/util/ObjectMapperFactory.java | 1 +
.../org/apache/pulsar/common/util/RateLimiter.java | 50 ++++------
.../pulsar/common/util/RelativeTimeUtil.java | 5 +-
.../apache/pulsar/common/util/SecurityUtility.java | 28 +++---
.../common/util/SslContextAutoRefreshBuilder.java | 8 +-
.../common/util/collections/ConcurrentBitSet.java | 3 +
.../util/collections/ConcurrentLongHashMap.java | 20 ++--
.../util/collections/ConcurrentLongPairSet.java | 44 +++++----
.../util/collections/ConcurrentOpenHashMap.java | 17 ++--
.../util/collections/ConcurrentOpenHashSet.java | 19 ++--
.../ConcurrentOpenLongPairRangeSet.java | 21 ++--
.../collections/ConcurrentSortedLongPairSet.java | 9 +-
.../collections/GrowableArrayBlockingQueue.java | 7 +-
.../collections/GrowablePriorityLongPairQueue.java | 39 ++++----
.../common/util/collections/LongPairRangeSet.java | 76 ++++++++------
.../common/util/collections/LongPairSet.java | 34 +++----
.../util/collections/TripleLongPriorityQueue.java | 26 ++---
.../collections/package-info.java} | 9 +-
.../pulsar/common/util/netty/EventLoopUtil.java | 6 +-
.../netty/package-info.java} | 9 +-
.../RawMessageId.java => util/package-info.java} | 9 +-
.../util/protobuf/ByteBufCodedInputStream.java | 5 +-
.../util/protobuf/ByteBufCodedOutputStream.java | 5 +-
.../protobuf/package-info.java} | 9 +-
.../policies/data/loadbalancer/BrokerUsage.java | 14 +--
.../policies/data/loadbalancer/JSONWritable.java | 7 +-
.../policies/data/loadbalancer/JvmUsage.java | 2 +-
.../data/loadbalancer/LoadManagerReport.java | 43 ++++----
.../policies/data/loadbalancer/LoadReport.java | 13 ++-
.../data/loadbalancer/LoadReportDeserializer.java | 9 +-
.../data/loadbalancer/LocalBrokerData.java | 12 +--
.../data/loadbalancer/NamespaceBundleStats.java | 18 ++--
.../policies/data/loadbalancer/NamespaceUsage.java | 28 +++---
.../data/loadbalancer/ResourceUnitRanking.java | 53 +++++-----
.../policies/data/loadbalancer/ResourceUsage.java | 5 +-
.../data/loadbalancer/ServiceLookupData.java | 12 ++-
.../data/loadbalancer/SystemResourceUsage.java | 4 +-
.../data/loadbalancer/package-info.java} | 9 +-
189 files changed, 1388 insertions(+), 993 deletions(-)
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index 95c8f78..081a642 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -156,6 +156,26 @@
</execution>
</executions>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>check-style</id>
+ <phase>verify</phase>
+ <configuration>
+ <configLocation>../buildtools/src/main/resources/pulsar/checkstyle.xml</configLocation>
+ <suppressionsLocation>../buildtools/src/main/resources/pulsar/suppressions.xml</suppressionsLocation>
+ <encoding>UTF-8</encoding>
+ <excludes>**/proto/*</excludes>
+ </configuration>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java
index e70dcec..6dbfd73 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java
@@ -31,8 +31,14 @@ import java.util.Base64;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+/**
+ * Extension of the {@code URLStreamHandler} class to handle all stream protocol handlers.
+ */
public class DataURLStreamHandler extends URLStreamHandler {
+ /**
+ * Representation of a communications link between the application and a URL.
+ */
static class DataURLConnection extends URLConnection {
private boolean parsed = false;
private String contentType;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/PulsarURLStreamHandlerFactory.java b/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/PulsarURLStreamHandlerFactory.java
index b09d384..b083109 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/PulsarURLStreamHandlerFactory.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/PulsarURLStreamHandlerFactory.java
@@ -23,6 +23,10 @@ import java.net.URLStreamHandlerFactory;
import java.util.HashMap;
import java.util.Map;
+/**
+ * This class defines a factory for {@code URL} stream
+ * protocol handlers.
+ */
public class PulsarURLStreamHandlerFactory implements URLStreamHandlerFactory {
private static final Map<String, Class<? extends URLStreamHandler>> handlers;
static {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/URL.java b/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/URL.java
index e5246b7..b203737 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/URL.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/URL.java
@@ -25,6 +25,9 @@ import java.net.URISyntaxException;
import java.net.URLConnection;
import java.net.URLStreamHandlerFactory;
+/**
+ * Wrapper around {@code java.net.URL} to improve usability.
+ */
public class URL {
private static final URLStreamHandlerFactory urlStreamHandlerFactory = new PulsarURLStreamHandlerFactory();
private final java.net.URL url;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/package-info.java
similarity index 91%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/client/api/url/package-info.java
index 1151443..322f4cb 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Classes to work with URLs.
+ */
+package org.apache.pulsar.client.api.url;
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java b/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java
index a324e7f..49312e9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java
@@ -32,6 +32,9 @@ import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+/**
+ * Holder of a ByteBuf allocator.
+ */
@UtilityClass
@Slf4j
public class PulsarByteBufAllocator {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/package-info.java
similarity index 89%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/allocator/package-info.java
index 1151443..e4ca036 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Classes implementing pulsar allocator.
+ */
+package org.apache.pulsar.common.allocator;
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
index 2f9c169..b791a80 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
@@ -24,22 +24,26 @@ import static org.apache.pulsar.common.protocol.Commands.readChecksum;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
-
import java.io.IOException;
-
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
-
-import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.Commands;
+/**
+ * Helper class to work with a raw Pulsar entry payload.
+ */
@UtilityClass
@Slf4j
public class MessageParser {
+
+ /**
+ * Definition of an interface to process a raw Pulsar entry payload.
+ */
public interface MessageProcessor {
void process(RawMessage message);
}
@@ -64,7 +68,8 @@ public class MessageParser {
try {
msgMetadata = Commands.parseMessageMetadata(payload);
} catch (Throwable t) {
- log.warn("[{}] Failed to deserialize metadata for message {}:{} - Ignoring", topicName, ledgerId, entryId);
+ log.warn("[{}] Failed to deserialize metadata for message {}:{} - Ignoring",
+ topicName, ledgerId, entryId);
return;
}
@@ -90,10 +95,11 @@ public class MessageParser {
if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) {
processor.process(
- RawMessageImpl.get(refCntMsgMetadata, null, uncompressedPayload.retain(), ledgerId, entryId, 0));
+ RawMessageImpl.get(refCntMsgMetadata, null, uncompressedPayload.retain(), ledgerId, entryId, 0));
} else {
// handle batch message enqueuing; uncompressed payload has all messages in batch
- receiveIndividualMessagesFromBatch(refCntMsgMetadata, uncompressedPayload, ledgerId, entryId, processor);
+ receiveIndividualMessagesFromBatch(
+ refCntMsgMetadata, uncompressedPayload, ledgerId, entryId, processor);
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java
index 512dfef..ce5d940 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java
@@ -26,26 +26,26 @@ import java.util.Optional;
/**
* View of a message that exposes the internal direct-memory buffer for more efficient processing.
*
- * The message needs to be released when the processing is done.
+ * <p>The message needs to be released when the processing is done.
*/
public interface RawMessage {
/**
- * Release all the resources associated with this raw message
+ * Release all the resources associated with this raw message.
*/
void release();
/**
* Return the properties attached to the message.
*
- * Properties are application defined key/value pairs that will be attached to the message
+ * <p>Properties are application defined key/value pairs that will be attached to the message.
*
* @return an unmodifiable view of the properties map
*/
Map<String, String> getProperties();
/**
- * Get the content of the message
+ * Get the content of the message.
*
* @return the byte array with the message payload
*/
@@ -54,9 +54,10 @@ public interface RawMessage {
/**
* Get the unique message ID associated with this message.
*
- * The message id can be used to univocally refer to a message without having the keep the entire payload in memory.
+ * <p>The message id can be used to univocally refer to a message
+ * without having the keep the entire payload in memory.
*
- * Only messages received from the consumer will have a message id assigned.
+ * <p>Only messages received from the consumer will have a message id assigned.
*
* @return the message id null if this message was not received by this client instance
*/
@@ -74,8 +75,7 @@ public interface RawMessage {
* Get the event time associated with this message. It is typically set by the applications via
* {@link MessageBuilder#setEventTime(long)}.
*
- * <p>
- * If there isn't any event time associated with this event, it will return 0.
+ * <p>If there isn't any event time associated with this event, it will return 0.
*/
long getEventTime();
@@ -96,7 +96,7 @@ public interface RawMessage {
String getProducerName();
/**
- * Get the key of the message
+ * Get the key of the message.
*
* @return the key of the message
*/
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
index 1151443..e096dcd 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.common.api.raw;
+/**
+ * Interface to uniquely identify an internal raw message.
+ */
public interface RawMessageId {
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/ReferenceCountedObject.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/ReferenceCountedObject.java
index 7c58013..79c33fe 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/ReferenceCountedObject.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/ReferenceCountedObject.java
@@ -23,6 +23,11 @@ import io.netty.util.ReferenceCounted;
import java.util.function.Consumer;
+/**
+ * Class representing a reference-counted object that requires explicit deallocation.
+ *
+ * @param <T> type of the object that requires explicit deallocation.
+ */
public class ReferenceCountedObject<T> extends AbstractReferenceCounted {
private final T object;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/package-info.java
similarity index 89%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/package-info.java
index 1151443..17dc4d2 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Classes implementing raw API messages.
+ */
+package org.apache.pulsar.common.api.raw;
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodec.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodec.java
index d0ddecb..d96dc7f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodec.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodec.java
@@ -18,17 +18,16 @@
*/
package org.apache.pulsar.common.compression;
-import java.io.IOException;
-
import io.netty.buffer.ByteBuf;
+import java.io.IOException;
/**
- * Generic compression codec interface
+ * Generic compression codec interface.
*/
public interface CompressionCodec {
/**
- * Compress a buffer
+ * Compress a buffer.
*
* @param raw
* a buffer with the uncompressed content. The reader/writer indexes will not be modified
@@ -39,7 +38,7 @@ public interface CompressionCodec {
/**
* Decompress a buffer.
*
- * The buffer needs to have been compressed with the matching Encoder.
+ * <p>The buffer needs to have been compressed with the matching Encoder.
*
* @param encoded
* the compressed content
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java
index 417a1b3..beda57b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java
@@ -19,20 +19,16 @@
package org.apache.pulsar.common.compression;
import io.netty.buffer.ByteBuf;
-
import java.io.IOException;
import java.nio.ByteBuffer;
-
import lombok.extern.slf4j.Slf4j;
-
-import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
-
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4FastDecompressor;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
/**
- * LZ4 Compression
+ * LZ4 Compression.
*/
@Slf4j
public class CompressionCodecLZ4 implements CompressionCodec {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecNone.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecNone.java
index 3b36263..e90d4c8 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecNone.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecNone.java
@@ -18,10 +18,12 @@
*/
package org.apache.pulsar.common.compression;
-import java.io.IOException;
-
import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+/**
+ * No compression.
+ */
public class CompressionCodecNone implements CompressionCodec {
@Override
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecProvider.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecProvider.java
index f108f2d..8cbe0f4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecProvider.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecProvider.java
@@ -25,6 +25,15 @@ import lombok.experimental.UtilityClass;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.common.api.proto.PulsarApi;
+/**
+ * Provider of compression codecs used in Pulsar.
+ *
+ * @see CompressionCodecNone
+ * @see CompressionCodecLZ4
+ * @see CompressionCodecZLib
+ * @see CompressionCodecZstd
+ * @see CompressionCodecSnappy
+ */
@UtilityClass
public class CompressionCodecProvider {
private static final EnumMap<PulsarApi.CompressionType, CompressionCodec> codecs;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
index 1598d5e..7e06025 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
@@ -20,14 +20,13 @@ package org.apache.pulsar.common.compression;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
-import lombok.extern.slf4j.Slf4j;
-import org.xerial.snappy.Snappy;
-
import java.io.IOException;
import java.nio.ByteBuffer;
+import lombok.extern.slf4j.Slf4j;
+import org.xerial.snappy.Snappy;
/**
- * Snappy Compression
+ * Snappy Compression.
*/
@Slf4j
public class CompressionCodecSnappy implements CompressionCodec {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZLib.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZLib.java
index da413f3..d061231 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZLib.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZLib.java
@@ -31,7 +31,7 @@ import java.util.zip.Inflater;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
/**
- * ZLib Compression
+ * ZLib Compression.
*/
public class CompressionCodecZLib implements CompressionCodec {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java
index 04c5aa8..3a45896 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java
@@ -28,7 +28,7 @@ import java.nio.ByteBuffer;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
/**
- * Zstandard Compression
+ * Zstandard Compression.
*/
public class CompressionCodecZstd implements CompressionCodec {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/package-info.java
similarity index 88%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/compression/package-info.java
index 1151443..57fb039 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Implementation of different compression codecs.
+ */
+package org.apache.pulsar.common.compression;
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
index 558c1aa..08664da 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
@@ -21,6 +21,9 @@ package org.apache.pulsar.common.conf;
import com.google.common.base.MoreObjects;
import java.util.Objects;
+/**
+ * Internal configuration data.
+ */
public class InternalConfigurationData {
private String zookeeperServers;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/conf/package-info.java
similarity index 90%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/conf/package-info.java
index 1151443..28e95d0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/conf/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Classes for internal configuration.
+ */
+package org.apache.pulsar.common.conf;
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ConsumerConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ConsumerConfig.java
index 0f27471..d503b10 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ConsumerConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ConsumerConfig.java
@@ -18,8 +18,15 @@
*/
package org.apache.pulsar.common.functions;
-import lombok.*;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+/**
+ * Configuration of a consumer.
+ */
@Data
@Builder
@NoArgsConstructor
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index 1686434..65cce3b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -22,27 +22,42 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import java.util.Collection;
import java.util.Map;
-import java.util.TreeMap;
-import lombok.*;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+/**
+ * Configuration of Pulsar Function.
+ */
@Getter
@Setter
@Data
@EqualsAndHashCode
@ToString
-@Builder(toBuilder=true)
+@Builder(toBuilder = true)
@NoArgsConstructor
@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class FunctionConfig {
+ /**
+ * Definition of possible processing guarantees.
+ */
public enum ProcessingGuarantees {
ATLEAST_ONCE,
ATMOST_ONCE,
EFFECTIVELY_ONCE
}
+ /**
+ * Definition of possible runtime environments.
+ */
public enum Runtime {
JAVA,
PYTHON,
@@ -63,7 +78,7 @@ public class FunctionConfig {
private Map<String, String> customSchemaInputs;
/**
- * A generalized way of specifying inputs
+ * A generalized way of specifying inputs.
*/
private Map<String, ConsumerConfig> inputSpecs;
@@ -71,7 +86,7 @@ public class FunctionConfig {
/**
* Represents either a builtin schema type (eg: 'avro', 'json', ect) or the class name for a Schema
- * implementation
+ * implementation.
*/
private String outputSchemaType;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionState.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionState.java
index 5b7b0ca..28d54bd 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionState.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionState.java
@@ -19,15 +19,24 @@
package org.apache.pulsar.common.functions;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import lombok.*;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+/**
+ * Function state.
+ */
@Getter
@Setter
@Data
@EqualsAndHashCode
@ToString
-@Builder(toBuilder=true)
+@Builder(toBuilder = true)
@NoArgsConstructor
@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java
index 06513d6..436bb4d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java
@@ -18,8 +18,18 @@
*/
package org.apache.pulsar.common.functions;
-import lombok.*;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+/**
+ * Class representing resources, such as CPU, RAM, and disk size.
+ */
@Getter
@Setter
@Data
@@ -27,7 +37,7 @@ import lombok.*;
@ToString
@AllArgsConstructor
@NoArgsConstructor
-@Builder(toBuilder=true)
+@Builder(toBuilder = true)
public class Resources {
private static final Resources DEFAULT = new Resources();
@@ -35,9 +45,9 @@ public class Resources {
// Default cpu is 1 core
private Double cpu = 1d;
// Default memory is 1GB
- private Long ram = 1073741824l;
+ private Long ram = 1073741824L;
// Default disk is 10GB
- private Long disk = 10737418240l;
+ private Long disk = 10737418240L;
public static Resources getDefaultResources() {
return DEFAULT;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/UpdateOptions.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/UpdateOptions.java
index d1186ca..108ccac 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/UpdateOptions.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/UpdateOptions.java
@@ -20,10 +20,12 @@ package org.apache.pulsar.common.functions;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
+/**
+ * Options while updating the sink.
+ */
@Data
@NoArgsConstructor
@ApiModel(value = "UpdateOptions", description = "Options while updating the sink")
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java
index b226c90..d614f6e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java
@@ -18,17 +18,20 @@
*/
package org.apache.pulsar.common.functions;
-import org.apache.pulsar.common.io.SinkConfig;
-import org.apache.pulsar.common.io.SourceConfig;
-
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.io.SourceConfig;
+
+/**
+ * Helper class to work with configuration.
+ */
public class Utils {
- public static String HTTP = "http";
- public static String FILE = "file";
- public static String BUILTIN = "builtin";
+ public final static String HTTP = "http";
+ public final static String FILE = "file";
+ public final static String BUILTIN = "builtin";
public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) {
return isNotBlank(functionPkgUrl) && (functionPkgUrl.startsWith(HTTP)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WindowConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WindowConfig.java
index 755201a..1b4d581 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WindowConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WindowConfig.java
@@ -18,9 +18,15 @@
*/
package org.apache.pulsar.common.functions;
-import lombok.*;
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
import lombok.experimental.Accessors;
+/**
+ * Configuration of a windowing function.
+ */
@Data
@Setter
@Getter
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WorkerInfo.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WorkerInfo.java
index b06503a..245cb7d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WorkerInfo.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WorkerInfo.java
@@ -24,8 +24,11 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
+/**
+ * Worker information.
+ */
@Getter
-@AllArgsConstructor(access=AccessLevel.PRIVATE)
+@AllArgsConstructor(access = AccessLevel.PRIVATE)
@NoArgsConstructor
@ToString
public class WorkerInfo {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/package-info.java
similarity index 89%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/functions/package-info.java
index 1151443..ef7100a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Classes needed for pulsar functions.
+ */
+package org.apache.pulsar.common.functions;
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/ConnectorDefinition.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/ConnectorDefinition.java
index e5fa3bd..d1bb334 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/ConnectorDefinition.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/ConnectorDefinition.java
@@ -21,31 +21,34 @@ package org.apache.pulsar.common.io;
import lombok.Data;
import lombok.NoArgsConstructor;
+/**
+ * Basic information about a Pulsar connector.
+ */
@Data
@NoArgsConstructor
public class ConnectorDefinition {
/**
- * The name of the connector type
+ * The name of the connector type.
*/
private String name;
/**
- * Description to be used for user help
+ * Description to be used for user help.
*/
private String description;
/**
* The class name for the connector source implementation.
- * <p>
- * If not defined, it will be assumed this connector cannot act as a data source
+ *
+ * <p>If not defined, it will be assumed this connector cannot act as a data source.
*/
private String sourceClass;
/**
* The class name for the connector sink implementation.
- * <p>
- * If not defined, it will be assumed this connector cannot act as a data si
+ *
+ * <p>If not defined, it will be assumed this connector cannot act as a data sink.
*/
private String sinkClass;
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
index 6e51cbb..ae0537e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
@@ -20,19 +20,28 @@ package org.apache.pulsar.common.io;
import java.util.Collection;
import java.util.Map;
-import java.util.TreeMap;
-import lombok.*;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
+/**
+ * Configuration of Pulsar Sink.
+ */
@Getter
@Setter
@Data
@EqualsAndHashCode
@ToString
-@Builder(toBuilder=true)
+@Builder(toBuilder = true)
@NoArgsConstructor
@AllArgsConstructor
public class SinkConfig {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
index 7d557c9..887c3b2 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
@@ -18,18 +18,27 @@
*/
package org.apache.pulsar.common.io;
-import lombok.*;
+import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
-import java.util.Map;
-
+/**
+ * Pulsar source configuration.
+ */
@Getter
@Setter
@Data
@EqualsAndHashCode
@ToString
-@Builder(toBuilder=true)
+@Builder(toBuilder = true)
@NoArgsConstructor
@AllArgsConstructor
public class SourceConfig {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/package-info.java
similarity index 91%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/io/package-info.java
index 1151443..4f642d6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Configuration classes for IO.
+ */
+package org.apache.pulsar.common.io;
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/data/LookupData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/data/LookupData.java
index 9776a5d..3c2eefa 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/data/LookupData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/data/LookupData.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.common.lookup.data;
import com.google.common.base.MoreObjects;
+/**
+ * This class encapsulates lookup data.
+ */
public class LookupData {
private String brokerUrl;
private String brokerUrlTls;
@@ -37,7 +40,7 @@ public class LookupData {
this.httpUrlTls = httpUrlTls;
this.nativeUrl = brokerUrl;
}
-
+
public LookupData(String brokerUrl, String brokerUrlTls, boolean redirect, boolean authoritative) {
this.brokerUrl = brokerUrl;
this.brokerUrlTls = brokerUrlTls;
@@ -55,17 +58,17 @@ public class LookupData {
public String getHttpUrl() {
return httpUrl;
}
-
+
public String getHttpUrlTls() {
- return httpUrlTls;
- }
+ return httpUrlTls;
+ }
- public void setHttpUrlTls(String httpUrlTls) {
- this.httpUrlTls = httpUrlTls;
- }
+ public void setHttpUrlTls(String httpUrlTls) {
+ this.httpUrlTls = httpUrlTls;
+ }
/**
- * Legacy name, but client libraries are still using it so it needs to be included in Json
+ * Legacy name, but client libraries are still using it so it needs to be included in Json.
*/
@Deprecated
public String getNativeUrl() {
@@ -75,8 +78,8 @@ public class LookupData {
/**
* "brokerUrlSsl" is needed in the serialized Json for compatibility reasons.
*
- * Older C++ pulsar client library version will fail the lookup if this field is not included, even though it's not
- * used
+ * <p>Older C++ pulsar client library version will fail the lookup if this field is not included,
+ * even though it's not used
*/
@Deprecated
public String getBrokerUrlSsl() {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/data/package-info.java
similarity index 91%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/lookup/data/package-info.java
index 1151443..3b4e1e9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/data/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Lookup data.
+ */
+package org.apache.pulsar.common.lookup.data;
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java
index aed20fa..5cb7f33 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.common.naming;
+/**
+ * Definition of constants.
+ */
public class Constants {
public static final String GLOBAL_CLUSTER = "global";
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Metadata.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Metadata.java
index 283b4f8..7e81b0e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Metadata.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Metadata.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.common.naming;
import java.util.Map;
+/**
+ * Validator for metadata configuration.
+ */
public class Metadata {
private static final int MAX_METADATA_SIZE = 1024; // 1 Kb
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java
index 485a498..1a6870e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java
@@ -20,15 +20,17 @@ package org.apache.pulsar.common.naming;
import static com.google.common.base.Preconditions.checkNotNull;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
import com.google.common.base.Objects;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+/**
+ * Parser of a value from the namespace field provided in configuration.
+ */
public class NamespaceName implements ServiceUnitId {
private final String namespace;
@@ -127,7 +129,7 @@ public class NamespaceName implements ServiceUnitId {
}
/**
- * Compose the topic name from namespace + topic
+ * Compose the topic name from namespace + topic.
*
* @param domain
* @param topic
@@ -208,7 +210,7 @@ public class NamespaceName implements ServiceUnitId {
}
/**
- * Returns true if this is a V2 namespace prop/namespace-name
+ * Returns true if this is a V2 namespace prop/namespace-name.
* @return true if v2
*/
public boolean isV2() {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/ServiceUnitId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/ServiceUnitId.java
index 5492389..381aa4d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/ServiceUnitId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/ServiceUnitId.java
@@ -18,24 +18,27 @@
*/
package org.apache.pulsar.common.naming;
+/**
+ * Basic interface for service unit's identification.
+ */
public interface ServiceUnitId {
@Override
- public abstract String toString();
+ String toString();
/**
- * Return the namespace object that this <code>ServiceUnitId</code> belongs to
+ * Return the namespace object that this <code>ServiceUnitId</code> belongs to.
*
* @return NamespaceName object
*/
- public abstract NamespaceName getNamespaceObject();
+ NamespaceName getNamespaceObject();
/**
- * Check whether a fully-qualified topic is included in this <code>ServiceUnitId</code> object
+ * Check whether a fully-qualified topic is included in this <code>ServiceUnitId</code> object.
*
* @param topicName
* a fully-qualified topic object
* @return true or false
*/
- public abstract boolean includes(TopicName topicName);
+ boolean includes(TopicName topicName);
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicDomain.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicDomain.java
index 26b382f..4ad91d6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicDomain.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicDomain.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.common.naming;
+/**
+ * Enumeration showing if a topic is persistent.
+ */
public enum TopicDomain {
persistent("persistent"), non_persistent("non-persistent");
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index eef05f3..258d6cb 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -18,22 +18,20 @@
*/
package org.apache.pulsar.common.naming;
+import com.google.common.base.Objects;
+import com.google.common.base.Splitter;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.UncheckedExecutionException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Objects;
-import com.google.common.base.Splitter;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.util.concurrent.UncheckedExecutionException;
-
/**
* Encapsulate the parsing of the completeTopicName name.
*/
@@ -113,7 +111,8 @@ public class TopicName implements ServiceUnitId {
if (parts.length == 3) {
completeTopicName = TopicDomain.persistent.name() + "://" + completeTopicName;
} else if (parts.length == 1) {
- completeTopicName = TopicDomain.persistent.name() + "://" + PUBLIC_TENANT + "/" + DEFAULT_NAMESPACE + "/" + parts[0];
+ completeTopicName = TopicDomain.persistent.name() + "://"
+ + PUBLIC_TENANT + "/" + DEFAULT_NAMESPACE + "/" + parts[0];
} else {
throw new IllegalArgumentException(
"Invalid short topic name '" + completeTopicName + "', it should be in the format of "
@@ -184,7 +183,7 @@ public class TopicName implements ServiceUnitId {
/**
* Extract the namespace portion out of a completeTopicName name.
*
- * Works both with old & new convention.
+ * <p>Works both with old & new convention.
*
* @return the namespace
*/
@@ -193,7 +192,7 @@ public class TopicName implements ServiceUnitId {
}
/**
- * Get the namespace object that this completeTopicName belongs to
+ * Get the namespace object that this completeTopicName belongs to.
*
* @return namespace object
*/
@@ -236,7 +235,8 @@ public class TopicName implements ServiceUnitId {
}
/**
- * @return partition index of the completeTopicName. It returns -1 if the completeTopicName (topic) is not partitioned.
+ * @return partition index of the completeTopicName.
+ * It returns -1 if the completeTopicName (topic) is not partitioned.
*/
public int getPartitionIndex() {
return partitionIndex;
@@ -247,10 +247,11 @@ public class TopicName implements ServiceUnitId {
}
/**
- * For partitions in a topic, return the base partitioned topic name
+ * For partitions in a topic, return the base partitioned topic name.
* Eg:
* <ul>
- * <li><code>persistent://prop/cluster/ns/my-topic-partition-1</code> --> <code>persistent://prop/cluster/ns/my-topic</code>
+ * <li><code>persistent://prop/cluster/ns/my-topic-partition-1</code> -->
+ * <code>persistent://prop/cluster/ns/my-topic</code>
* <li><code>persistent://prop/cluster/ns/my-topic</code> --> <code>persistent://prop/cluster/ns/my-topic</code>
* </ul>
*/
@@ -263,7 +264,8 @@ public class TopicName implements ServiceUnitId {
}
/**
- * @return partition index of the completeTopicName. It returns -1 if the completeTopicName (topic) is not partitioned.
+ * @return partition index of the completeTopicName.
+ * It returns -1 if the completeTopicName (topic) is not partitioned.
*/
public static int getPartitionIndex(String topic) {
int partitionIndex = -1;
@@ -279,9 +281,8 @@ public class TopicName implements ServiceUnitId {
}
/**
- * Returns the http rest path for use in the admin web service
+ * Returns the http rest path for use in the admin web service.
* Eg:
- *
* * "persistent/my-tenant/my-namespace/my-topic"
* * "non-persistent/my-tenant/my-namespace/my-topic"
*
@@ -314,11 +315,12 @@ public class TopicName implements ServiceUnitId {
}
/**
- * Get a string suitable for completeTopicName lookup
- * <p>
- * Example:
- * <p>
- * persistent://tenant/cluster/namespace/completeTopicName -> persistent/tenant/cluster/namespace/completeTopicName
+ * Get a string suitable for completeTopicName lookup.
+ *
+ * <p>Example:
+ *
+ * <p>persistent://tenant/cluster/namespace/completeTopicName ->
+ * persistent/tenant/cluster/namespace/completeTopicName
*
* @return
*/
@@ -366,7 +368,7 @@ public class TopicName implements ServiceUnitId {
}
/**
- * Returns true if this a V2 topic name prop/ns/topic-name
+ * Returns true if this a V2 topic name prop/ns/topic-name.
* @return true if V2
*/
public boolean isV2() {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/package-info.java
similarity index 89%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/naming/package-info.java
index 1151443..8816d8d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Classes to work different configuration values.
+ */
+package org.apache.pulsar.common.naming;
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java
index 20bc5e6..6e34e78 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java
@@ -129,7 +129,8 @@ public class FileUtils {
* @throws IOException if abstract pathname does not denote a directory, or
* if an I/O error occurs
*/
- public static void deleteFilesInDirectory(final File directory, final FilenameFilter filter, final Logger logger) throws IOException {
+ public static void deleteFilesInDirectory(final File directory, final FilenameFilter filter,
+ final Logger logger) throws IOException {
FileUtils.deleteFilesInDirectory(directory, filter, logger, false);
}
@@ -145,7 +146,8 @@ public class FileUtils {
* @throws IOException if abstract pathname does not denote a directory, or
* if an I/O error occurs
*/
- public static void deleteFilesInDirectory(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse) throws IOException {
+ public static void deleteFilesInDirectory(final File directory, final FilenameFilter filter, final Logger logger,
+ final boolean recurse) throws IOException {
FileUtils.deleteFilesInDirectory(directory, filter, logger, recurse, false);
}
@@ -163,7 +165,9 @@ public class FileUtils {
* @throws IOException if abstract pathname does not denote a directory, or
* if an I/O error occurs
*/
- public static void deleteFilesInDirectory(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse, final boolean deleteEmptyDirectories) throws IOException {
+ public static void deleteFilesInDirectory(
+ final File directory, final FilenameFilter filter, final Logger logger,
+ final boolean recurse, final boolean deleteEmptyDirectories) throws IOException {
// ensure the specified directory is actually a directory and that it exists
if (null != directory && directory.isDirectory()) {
final File ingestFiles[] = directory.listFiles();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
index b375787..7ee22d6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
@@ -38,7 +38,6 @@ import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
-
import lombok.extern.slf4j.Slf4j;
/**
@@ -141,13 +140,14 @@ public class NarClassLoader extends URLClassLoader {
public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars) throws IOException {
File unpacked = NarUnpacker.unpackNar(narPath, NAR_CACHE_DIR);
try {
- return new NarClassLoader(unpacked, additionalJars, NarClassLoader.class.getClassLoader() );
+ return new NarClassLoader(unpacked, additionalJars, NarClassLoader.class.getClassLoader());
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
}
- public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars, ClassLoader parent) throws IOException {
+ public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars, ClassLoader parent)
+ throws IOException {
File unpacked = NarUnpacker.unpackNar(narPath, NAR_CACHE_DIR);
try {
return new NarClassLoader(unpacked, additionalJars, parent);
@@ -198,7 +198,7 @@ public class NarClassLoader extends URLClassLoader {
}
/**
- * Read a service definition as a String
+ * Read a service definition as a String.
*/
public String getServiceDefinition(String serviceName) throws IOException {
String serviceDefPath = narWorkingDirectory + "/META-INF/services/" + serviceName;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
index c8ca0e4..5383124 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
@@ -39,10 +39,13 @@ import java.util.jar.JarFile;
import lombok.extern.slf4j.Slf4j;
+/**
+ * Helper class to unpack NARs.
+ */
@Slf4j
public class NarUnpacker {
- private static String HASH_FILENAME = "nar-md5sum";
+ private final static String HASH_FILENAME = "nar-md5sum";
/**
* Unpacks the specified nar into the specified base working directory.
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/package-info.java
similarity index 91%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/nar/package-info.java
index 1151443..1a7857b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Classes to work with NARs.
+ */
+package org.apache.pulsar.common.nar;
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/net/package-info.java
similarity index 90%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/net/package-info.java
index 1151443..bd828d1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/net/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Classes for network configuration.
+ */
+package org.apache.pulsar.common.net;
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/partition/PartitionedTopicMetadata.java b/pulsar-common/src/main/java/org/apache/pulsar/common/partition/PartitionedTopicMetadata.java
index 3bf8e57..1f023de 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/partition/PartitionedTopicMetadata.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/partition/PartitionedTopicMetadata.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.common.partition;
+/**
+ * Metadata of a partitioned topic.
+ */
public class PartitionedTopicMetadata {
/* Number of partitions for the topic */
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/partition/package-info.java
similarity index 91%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/partition/package-info.java
index 1151443..d2d7029 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/partition/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Common partitioning.
+ */
+package org.apache.pulsar.common.partition;
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/AutoFailoverPolicy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/AutoFailoverPolicy.java
index 61e5280..a494938 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/AutoFailoverPolicy.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/AutoFailoverPolicy.java
@@ -22,10 +22,13 @@ import java.util.SortedSet;
import org.apache.pulsar.common.policies.data.BrokerStatus;
+/**
+ * Basic defintion of an auto-failover policy.
+ */
public abstract class AutoFailoverPolicy {
/**
- * Checks to see whether the new namespace ownership should be failed over to the secondary brokers
+ * Checks to see whether the new namespace ownership should be failed over to the secondary brokers.
*
* @param brokerStatus
* @return
@@ -35,7 +38,7 @@ public abstract class AutoFailoverPolicy {
public abstract boolean shouldFailoverToSecondary(int totalPrimaryCandidates);
/**
- * Determine whether a broker is considered available or not
+ * Determine whether a broker is considered available or not.
*
* @param brokerStatus
* @return
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java
index 8d83ea2..b003ead 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java
@@ -25,24 +25,27 @@ import java.util.SortedSet;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.BrokerStatus;
+/**
+ * Namespace isolation policy.
+ */
public interface NamespaceIsolationPolicy {
/**
- * Get the list of regex for the set of primary brokers
+ * Get the list of regex for the set of primary brokers.
*
* @return
*/
List<String> getPrimaryBrokers();
/**
- * Get the list of regex for the set of secondary brokers
+ * Get the list of regex for the set of secondary brokers.
*
* @return
*/
List<String> getSecondaryBrokers();
/**
- * Get the list of primary brokers for the namespace according to the policy
+ * Get the list of primary brokers for the namespace according to the policy.
*
* @param availableBrokers
* @param namespace
@@ -51,7 +54,7 @@ public interface NamespaceIsolationPolicy {
List<URL> findPrimaryBrokers(List<URL> availableBrokers, NamespaceName namespace);
/**
- * Get the list of secondary brokers for the namespace according to the policy
+ * Get the list of secondary brokers for the namespace according to the policy.
*
* @param availableBrokers
* @param namespace
@@ -60,7 +63,7 @@ public interface NamespaceIsolationPolicy {
List<URL> findSecondaryBrokers(List<URL> availableBrokers, NamespaceName namespace);
/**
- * Check to see whether the primary brokers can still handle a new namespace or has to failover
+ * Check to see whether the primary brokers can still handle a new namespace or has to failover.
*
* @param primaryCandidates
* @return
@@ -68,7 +71,7 @@ public interface NamespaceIsolationPolicy {
boolean shouldFailover(SortedSet<BrokerStatus> primaryCandidates);
/**
- * Check to see whether the primary brokers can still handle a new namespace or has to failover
+ * Check to see whether the primary brokers can still handle a new namespace or has to failover.
*
* @param totalPrimaryCandidates
* @return
@@ -76,7 +79,7 @@ public interface NamespaceIsolationPolicy {
boolean shouldFailover(int totalPrimaryCandidates);
/**
- * Check to see whether the namespace ownership should fallback to the primary brokers
+ * Check to see whether the namespace ownership should fallback to the primary brokers.
*
* @param primaryBrokers
* @return
@@ -84,7 +87,7 @@ public interface NamespaceIsolationPolicy {
boolean shouldFallback(SortedSet<BrokerStatus> primaryBrokers);
/**
- * Check to see whether the specific host is a primary broker
+ * Check to see whether the specific host is a primary broker.
*
* @param brokerAddress
* @return
@@ -92,7 +95,7 @@ public interface NamespaceIsolationPolicy {
boolean isPrimaryBroker(String brokerAddress);
/**
- * Check to see whether the specific host is a secondary broker
+ * Check to see whether the specific host is a secondary broker.
*
* @param brokerAddress
* @return
@@ -100,7 +103,7 @@ public interface NamespaceIsolationPolicy {
boolean isSecondaryBroker(String brokerAddress);
/**
- * According to the namespace isolation policy, find the allowed available primary brokers
+ * According to the namespace isolation policy, find the allowed available primary brokers.
*
* @param primaryCandidates
* @return
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthAction.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthAction.java
index faef870..6f70e96 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthAction.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthAction.java
@@ -19,15 +19,15 @@
package org.apache.pulsar.common.policies.data;
/**
- * Authorization action for Pulsar policies
+ * Authorization action for Pulsar policies.
*/
public enum AuthAction {
- /** Permission to produce/publish messages */
+ /** Permission to produce/publish messages. */
produce,
- /** Permission to consume messages */
+ /** Permission to consume messages. */
consume,
- /** Permissions for functions ops **/
+ /** Permissions for functions ops. **/
functions,
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java
index 82e6a7e..2390200 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java
@@ -18,15 +18,20 @@
*/
package org.apache.pulsar.common.policies.data;
+import com.google.common.collect.Maps;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import com.google.common.collect.Maps;
-
+/**
+ * Authentication policies.
+ */
public class AuthPolicies {
+ @SuppressWarnings("checkstyle:MemberName")
public final Map<String, Set<AuthAction>> namespace_auth;
+ @SuppressWarnings("checkstyle:MemberName")
public final Map<String, Map<String, Set<AuthAction>>> destination_auth;
+ @SuppressWarnings("checkstyle:MemberName")
public final Map<String, Set<String>> subscription_auth_roles;
public AuthPolicies() {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AutoFailoverPolicyData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AutoFailoverPolicyData.java
index 3963bc9..d958a57 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AutoFailoverPolicyData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AutoFailoverPolicyData.java
@@ -20,14 +20,15 @@ package org.apache.pulsar.common.policies.data;
import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.base.Objects;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.util.Map;
-
import org.apache.pulsar.common.policies.impl.AutoFailoverPolicyFactory;
-import com.google.common.base.Objects;
-
+/**
+ * The auto failover policy configuration data.
+ */
@ApiModel(
value = "AutoFailoverPolicyData",
description = "The auto failover policy configuration data"
@@ -38,6 +39,7 @@ public class AutoFailoverPolicyData {
value = "The auto failover policy type",
allowableValues = "min_available"
)
+ @SuppressWarnings("checkstyle:MemberName")
public AutoFailoverPolicyType policy_type;
@ApiModelProperty(
name = "parameters",
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AutoFailoverPolicyType.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AutoFailoverPolicyType.java
index c457f0d..57229e6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AutoFailoverPolicyType.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AutoFailoverPolicyType.java
@@ -20,14 +20,15 @@ package org.apache.pulsar.common.policies.data;
import io.swagger.annotations.ApiModel;
+/**
+ * The policy type of auto failover.
+ */
@ApiModel(
value = "AutoFailoverPolicyType",
description = "The policy type of auto failover."
)
public enum AutoFailoverPolicyType {
- min_available
-
- ;
+ min_available;
public static AutoFailoverPolicyType fromString(String autoFailoverPolicyTypeName) {
for (AutoFailoverPolicyType autoFailoverPolicyType : AutoFailoverPolicyType.values()) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java
index c468975..9c74158 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java
@@ -18,14 +18,13 @@
*/
package org.apache.pulsar.common.policies.data;
-import java.util.Objects;
-
import com.google.common.base.MoreObjects;
+import java.util.Objects;
/**
* Unit of a backlog quota configuration for a scoped resource in a Pulsar instance.
- * <p>
- * A scoped resource is identified by a {@link BacklogQuotaType} enumeration type which is containing two attributes:
+ *
+ * <p>A scoped resource is identified by a {@link BacklogQuotaType} enumeration type which is containing two attributes:
* <code>limit</code> representing a quota limit in bytes and <code>policy</code> for backlog retention policy.
*/
public class BacklogQuota {
@@ -33,7 +32,7 @@ public class BacklogQuota {
private RetentionPolicy policy;
/**
- * Gets quota limit in bytes
+ * Gets quota limit in bytes.
*
* @return quota limit in bytes
*/
@@ -46,7 +45,7 @@ public class BacklogQuota {
}
/**
- * Sets quota limit in bytes
+ * Sets quota limit in bytes.
*
* @param limit
* quota limit in bytes
@@ -89,21 +88,21 @@ public class BacklogQuota {
/**
* Identifier to a backlog quota configuration (an instance of {@link BacklogQuota}).
*/
- public static enum BacklogQuotaType {
+ public enum BacklogQuotaType {
destination_storage;
}
/**
* Enumeration type determines how to retain backlog against the resource shortages.
*/
- public static enum RetentionPolicy {
- /** Policy which holds producer's send request until the resource becomes available (or holding times out) */
+ public enum RetentionPolicy {
+ /** Policy which holds producer's send request until the resource becomes available (or holding times out). */
producer_request_hold,
- /** Policy which throws javax.jms.ResourceAllocationException to the producer */
+ /** Policy which throws javax.jms.ResourceAllocationException to the producer. */
producer_exception,
- /** Policy which evicts the oldest message from the slowest consumer's backlog */
+ /** Policy which evicts the oldest message from the slowest consumer's backlog. */
consumer_backlog_eviction,
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookieAffinityGroupData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookieAffinityGroupData.java
index 098d4d3..15f8334 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookieAffinityGroupData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookieAffinityGroupData.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.common.policies.data;
import com.google.common.base.Objects;
+/**
+ * Description of a BookKeeper's affinity group.
+ */
public class BookieAffinityGroupData {
public String bookkeeperAffinityGroupPrimary;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookieInfo.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookieInfo.java
index a0d4a0e..6d5eb79 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookieInfo.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookieInfo.java
@@ -22,6 +22,9 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
+/**
+ * Bookie information.
+ */
@Data
@AllArgsConstructor
@NoArgsConstructor
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookiesRackConfiguration.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookiesRackConfiguration.java
index a2741b2..f65d5d9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookiesRackConfiguration.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BookiesRackConfiguration.java
@@ -24,11 +24,14 @@ import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
+/**
+ * Configuration for a rack of BookKeepers.
+ */
public class BookiesRackConfiguration extends TreeMap<String, Map<String, BookieInfo>> {
public boolean removeBookie(String address) {
for (Map<String, BookieInfo> m : values()) {
- if (m.remove(address) != null ) {
+ if (m.remove(address) != null) {
return true;
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerAssignment.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerAssignment.java
index 6c91772..b165104 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerAssignment.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerAssignment.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.common.policies.data;
+/**
+ * Definition of possible broker assignments.
+ */
public enum BrokerAssignment {
primary, secondary, shared
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerNamespaceIsolationData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerNamespaceIsolationData.java
index e20fbe9..53faf0a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerNamespaceIsolationData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerNamespaceIsolationData.java
@@ -18,12 +18,14 @@
*/
package org.apache.pulsar.common.policies.data;
+import com.google.common.base.Objects;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.util.List;
-import com.google.common.base.Objects;
-
+/**
+ * The namespace isolation data for a given broker.
+ */
@ApiModel(
value = "BrokerNamespaceIsolationData",
description = "The namespace isolation data for a given broker"
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerStatus.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerStatus.java
index f76b39b..c7c2a8e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerStatus.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerStatus.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.common.policies.data;
import com.google.common.collect.ComparisonChain;
+/**
+ * Information about the broker status.
+ */
public class BrokerStatus implements Comparable<BrokerStatus> {
private String brokerAddress;
private boolean active;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BundlesData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BundlesData.java
index 47486e1..4bc95cb 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BundlesData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BundlesData.java
@@ -18,11 +18,13 @@
*/
package org.apache.pulsar.common.policies.data;
+import com.google.common.base.MoreObjects;
import java.util.List;
import java.util.Objects;
-import com.google.common.base.MoreObjects;
-
+/**
+ * Holder for bundles.
+ */
public class BundlesData {
public List<String> boundaries;
public int numBundles;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
index 538ed48..e5b2859 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
@@ -20,13 +20,15 @@ package org.apache.pulsar.common.policies.data;
import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.base.MoreObjects;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.util.LinkedHashSet;
import java.util.Objects;
-import com.google.common.base.MoreObjects;
-
+/**
+ * The configuration data for a cluster.
+ */
@ApiModel(
value = "ClusterData",
description = "The configuration data for a cluster"
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
index b8e2ee1..f929e22 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
@@ -18,52 +18,53 @@
*/
package org.apache.pulsar.common.policies.data;
-import java.util.Map;
-
import static com.google.common.base.Preconditions.checkNotNull;
+import java.util.Map;
+
/**
+ * Consumer statistics.
*/
public class ConsumerStats {
- /** Total rate of messages delivered to the consumer. msg/s */
+ /** Total rate of messages delivered to the consumer (msg/s). */
public double msgRateOut;
- /** Total throughput delivered to the consumer. bytes/s */
+ /** Total throughput delivered to the consumer (bytes/s). */
public double msgThroughputOut;
- /** Total rate of messages redelivered by this consumer. msg/s */
+ /** Total rate of messages redelivered by this consumer (msg/s). */
public double msgRateRedeliver;
- /** Name of the consumer */
+ /** Name of the consumer. */
public String consumerName;
- /** Number of available message permits for the consumer */
+ /** Number of available message permits for the consumer. */
public int availablePermits;
- /** Number of unacknowledged messages for the consumer */
+ /** Number of unacknowledged messages for the consumer. */
public int unackedMessages;
- /** Flag to verify if consumer is blocked due to reaching threshold of unacked messages */
+ /** Flag to verify if consumer is blocked due to reaching threshold of unacked messages. */
public boolean blockedConsumerOnUnackedMsgs;
- /** Address of this consumer */
+ /** Address of this consumer. */
private int addressOffset = -1;
private int addressLength;
- /** Timestamp of connection */
+ /** Timestamp of connection. */
private int connectedSinceOffset = -1;
private int connectedSinceLength;
- /** Client library version */
+ /** Client library version. */
private int clientVersionOffset = -1;
private int clientVersionLength;
- /** Metadata (key/value strings) associated with this consumer */
+ /** Metadata (key/value strings) associated with this consumer. */
public Map<String, String> metadata;
/**
- * In order to prevent multiple string object allocation under stats: create a string-buffer that stores data for all string
- * place-holders
+ * In order to prevent multiple string object allocation under stats: create a string-buffer
+ * that stores data for all string place-holders.
*/
private StringBuilder stringBuffer = new StringBuilder();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DispatchRate.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DispatchRate.java
index 90e4e66..c93a713 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DispatchRate.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DispatchRate.java
@@ -18,10 +18,12 @@
*/
package org.apache.pulsar.common.policies.data;
-import java.util.Objects;
-
import com.google.common.base.MoreObjects;
+import java.util.Objects;
+/**
+ * Dispatch rate.
+ */
public class DispatchRate {
public int dispatchThrottlingRateInMsg = -1;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ErrorData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ErrorData.java
index 40f51f1..c5892df 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ErrorData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ErrorData.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.common.policies.data;
+/**
+ * Class holding data in case of error responses.
+ */
public class ErrorData {
public String reason;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ExceptionInformation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ExceptionInformation.java
index 20eb0d5..574f600 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ExceptionInformation.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ExceptionInformation.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.common.policies.data;
import lombok.Data;
+/**
+ * Exception information.
+ */
@Data
public class ExceptionInformation {
String exceptionString;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FailureDomain.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FailureDomain.java
index cf0ef4c..dbe77d8 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FailureDomain.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FailureDomain.java
@@ -25,6 +25,9 @@ import io.swagger.annotations.ApiModelProperty;
import java.util.HashSet;
import java.util.Set;
+/**
+ * The data of a failure domain configuration in a cluster.
+ */
@ApiModel(
value = "FailureDomain",
description = "The data of a failure domain configuration in a cluster"
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
index ff73272..8ccf071 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStats.java
@@ -21,106 +21,121 @@ package org.apache.pulsar.common.policies.data;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
-import lombok.Data;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
-
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import lombok.Data;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+/**
+ * Statistics for Pulsar Function.
+ */
@Data
@JsonInclude(JsonInclude.Include.ALWAYS)
-@JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", "systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency", "1min", "lastInvocation", "instances" })
+@JsonPropertyOrder({"receivedTotal", "processedSuccessfullyTotal", "systemExceptionsTotal", "userExceptionsTotal",
+ "avgProcessLatency", "1min", "lastInvocation", "instances"})
public class FunctionStats {
/**
- * Overall total number of records function received from source
+ * Overall total number of records function received from source.
**/
public long receivedTotal;
/**
- * Overall total number of records successfully processed by user function
+ * Overall total number of records successfully processed by user function.
**/
public long processedSuccessfullyTotal;
/**
- * Overall total number of system exceptions thrown
+ * Overall total number of system exceptions thrown.
**/
public long systemExceptionsTotal;
/**
- * Overall total number of user exceptions thrown
+ * Overall total number of user exceptions thrown.
**/
public long userExceptionsTotal;
/**
- * Average process latency for function
+ * Average process latency for function.
**/
public Double avgProcessLatency;
@JsonProperty("1min")
- public FunctionInstanceStats.FunctionInstanceStatsDataBase oneMin = new FunctionInstanceStats.FunctionInstanceStatsDataBase();
+ public FunctionInstanceStats.FunctionInstanceStatsDataBase oneMin =
+ new FunctionInstanceStats.FunctionInstanceStatsDataBase();
/**
- * Timestamp of when the function was last invoked by any instance
+ * Timestamp of when the function was last invoked by any instance.
**/
public Long lastInvocation;
+ /**
+ * Function instance statistics.
+ */
@Data
@JsonInclude(JsonInclude.Include.ALWAYS)
@JsonPropertyOrder({ "instanceId", "metrics" })
public static class FunctionInstanceStats {
- /** Instance Id of function instance **/
+ /** Instance Id of function instance. **/
public int instanceId;
- @Data
+ /**
+ * Function instance statistics data base.
+ */
+ @Data
@JsonInclude(JsonInclude.Include.ALWAYS)
- @JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", "systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency" })
+ @JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", "systemExceptionsTotal",
+ "userExceptionsTotal", "avgProcessLatency" })
public static class FunctionInstanceStatsDataBase {
/**
- * Total number of records function received from source for instance
+ * Total number of records function received from source for instance.
**/
public long receivedTotal;
/**
- * Total number of records successfully processed by user function for instance
+ * Total number of records successfully processed by user function for instance.
**/
public long processedSuccessfullyTotal;
/**
- * Total number of system exceptions thrown for instance
+ * Total number of system exceptions thrown for instance.
**/
public long systemExceptionsTotal;
/**
- * Total number of user exceptions thrown for instance
+ * Total number of user exceptions thrown for instance.
**/
public long userExceptionsTotal;
/**
- * Average process latency for function for instance
+ * Average process latency for function for instance.
**/
public Double avgProcessLatency;
}
+ /**
+ * Function instance statistics data.
+ */
@Data
@JsonInclude(JsonInclude.Include.ALWAYS)
- @JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", "systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency", "1min", "lastInvocation", "userMetrics" })
+ @JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", "systemExceptionsTotal",
+ "userExceptionsTotal", "avgProcessLatency", "1min", "lastInvocation", "userMetrics" })
public static class FunctionInstanceStatsData extends FunctionInstanceStatsDataBase {
@JsonProperty("1min")
public FunctionInstanceStatsDataBase oneMin = new FunctionInstanceStatsDataBase();
/**
- * Timestamp of when the function was last invoked for instance
+ * Timestamp of when the function was last invoked for instance.
**/
public Long lastInvocation;
/**
- * Map of user defined metrics
+ * Map of user defined metrics.
**/
public Map<String, Double> userMetrics = new HashMap<>();
}
@@ -139,37 +154,38 @@ public class FunctionStats {
int nonNullInstances = 0;
int nonNullInstancesOneMin = 0;
for (FunctionInstanceStats functionInstanceStats : instances) {
- FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData = functionInstanceStats.getMetrics();
- receivedTotal += functionInstanceStatsData.receivedTotal;
- processedSuccessfullyTotal += functionInstanceStatsData.processedSuccessfullyTotal;
- systemExceptionsTotal += functionInstanceStatsData.systemExceptionsTotal;
- userExceptionsTotal += functionInstanceStatsData.userExceptionsTotal;
- if (functionInstanceStatsData.avgProcessLatency != null) {
- if (avgProcessLatency == null) {
- avgProcessLatency = 0.0;
- }
- avgProcessLatency += functionInstanceStatsData.avgProcessLatency;
- nonNullInstances ++;
+ FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData =
+ functionInstanceStats.getMetrics();
+ receivedTotal += functionInstanceStatsData.receivedTotal;
+ processedSuccessfullyTotal += functionInstanceStatsData.processedSuccessfullyTotal;
+ systemExceptionsTotal += functionInstanceStatsData.systemExceptionsTotal;
+ userExceptionsTotal += functionInstanceStatsData.userExceptionsTotal;
+ if (functionInstanceStatsData.avgProcessLatency != null) {
+ if (avgProcessLatency == null) {
+ avgProcessLatency = 0.0;
}
+ avgProcessLatency += functionInstanceStatsData.avgProcessLatency;
+ nonNullInstances++;
+ }
- oneMin.receivedTotal += functionInstanceStatsData.oneMin.receivedTotal;
- oneMin.processedSuccessfullyTotal += functionInstanceStatsData.oneMin.processedSuccessfullyTotal;
- oneMin.systemExceptionsTotal += functionInstanceStatsData.oneMin.systemExceptionsTotal;
- oneMin.userExceptionsTotal += functionInstanceStatsData.oneMin.userExceptionsTotal;
- if (functionInstanceStatsData.oneMin.avgProcessLatency != null) {
- if (oneMin.avgProcessLatency == null) {
- oneMin.avgProcessLatency = 0.0;
- }
- oneMin.avgProcessLatency += functionInstanceStatsData.oneMin.avgProcessLatency;
- nonNullInstancesOneMin ++;
+ oneMin.receivedTotal += functionInstanceStatsData.oneMin.receivedTotal;
+ oneMin.processedSuccessfullyTotal += functionInstanceStatsData.oneMin.processedSuccessfullyTotal;
+ oneMin.systemExceptionsTotal += functionInstanceStatsData.oneMin.systemExceptionsTotal;
+ oneMin.userExceptionsTotal += functionInstanceStatsData.oneMin.userExceptionsTotal;
+ if (functionInstanceStatsData.oneMin.avgProcessLatency != null) {
+ if (oneMin.avgProcessLatency == null) {
+ oneMin.avgProcessLatency = 0.0;
}
+ oneMin.avgProcessLatency += functionInstanceStatsData.oneMin.avgProcessLatency;
+ nonNullInstancesOneMin++;
+ }
- if (functionInstanceStatsData.lastInvocation != null) {
- if (lastInvocation == null || functionInstanceStatsData.lastInvocation > lastInvocation) {
- lastInvocation = functionInstanceStatsData.lastInvocation;
- }
+ if (functionInstanceStatsData.lastInvocation != null) {
+ if (lastInvocation == null || functionInstanceStatsData.lastInvocation > lastInvocation) {
+ lastInvocation = functionInstanceStatsData.lastInvocation;
}
}
+ }
// calculate average from sum
if (nonNullInstances > 0) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStatus.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStatus.java
index 4c76f0f..30b50b8 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStatus.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FunctionStatus.java
@@ -18,13 +18,15 @@
*/
package org.apache.pulsar.common.policies.data;
-import lombok.Data;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
-
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
+import lombok.Data;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+/**
+ * Data lass containing status of Pulsar Function.
+ */
@Data
public class FunctionStatus {
@@ -32,11 +34,17 @@ public class FunctionStatus {
public int numRunning;
public List<FunctionInstanceStatus> instances = new LinkedList<>();
+ /**
+ * Function instance status.
+ */
@Data
public static class FunctionInstanceStatus {
public int instanceId;
public FunctionInstanceStatusData status;
+ /**
+ * Function instance status data.
+ */
@Data
public static class FunctionInstanceStatusData {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
index c8ae18a..4393f7c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
@@ -22,6 +22,9 @@ import static org.apache.pulsar.common.policies.data.Policies.defaultBundle;
import com.google.common.base.Objects;
+/**
+ * Local policies.
+ */
public class LocalPolicies {
public BundlesData bundles;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java
index 4aa03b5..c3cfeed 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java
@@ -20,13 +20,15 @@ package org.apache.pulsar.common.policies.data;
import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.base.Objects;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.util.ArrayList;
import java.util.List;
-import com.google.common.base.Objects;
-
+/**
+ * The data of namespace isolation configuration.
+ */
@ApiModel(
value = "NamespaceIsolationData",
description = "The data of namespace isolation configuration"
@@ -59,6 +61,7 @@ public class NamespaceIsolationData {
+ " }"
+ "}"
)
+ @SuppressWarnings("checkstyle:MemberName")
public AutoFailoverPolicyData auto_failover_policy;
@Override
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOwnershipStatus.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOwnershipStatus.java
index 16c0ecd..39a7e3a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOwnershipStatus.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOwnershipStatus.java
@@ -18,10 +18,16 @@
*/
package org.apache.pulsar.common.policies.data;
+/**
+ * Information about the namespace's ownership.
+ */
public class NamespaceOwnershipStatus {
+ @SuppressWarnings("checkstyle:MemberName")
public BrokerAssignment broker_assignment;
+ @SuppressWarnings("checkstyle:MemberName")
public boolean is_controlled;
+ @SuppressWarnings("checkstyle:MemberName")
public boolean is_active;
public NamespaceOwnershipStatus() {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentPublisherStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentPublisherStats.java
index cab1a31..0d661dd 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentPublisherStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentPublisherStats.java
@@ -21,11 +21,12 @@ package org.apache.pulsar.common.policies.data;
import static com.google.common.base.Preconditions.checkNotNull;
/**
+ * Non-persistent publisher statistics.
*/
public class NonPersistentPublisherStats extends PublisherStats {
/**
* for non-persistent topic: broker drops msg if publisher publishes messages more than configured max inflight
- * messages per connection
+ * messages per connection.
**/
public double msgDropRate;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentReplicatorStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentReplicatorStats.java
index f43f7e5..906afde 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentReplicatorStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentReplicatorStats.java
@@ -20,10 +20,13 @@ package org.apache.pulsar.common.policies.data;
import static com.google.common.base.Preconditions.checkNotNull;
+/**
+ * Statistics for a non-persistent replicator.
+ */
public class NonPersistentReplicatorStats extends ReplicatorStats {
/**
- * for non-persistent topic: broker drops msg for replicator if replicator connection is not writable
+ * for non-persistent topic: broker drops msg for replicator if replicator connection is not writable.
**/
public double msgDropRate;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentSubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentSubscriptionStats.java
index 2e8c50e..896e677 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentSubscriptionStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentSubscriptionStats.java
@@ -21,12 +21,13 @@ package org.apache.pulsar.common.policies.data;
import static com.google.common.base.Preconditions.checkNotNull;
/**
+ * Statistics for subscription to non-persistent topics.
*/
-public class NonPersistentSubscriptionStats extends SubscriptionStats{
+public class NonPersistentSubscriptionStats extends SubscriptionStats {
/**
* for non-persistent topic: broker drops msg for subscription if none of the consumer available for message
- * delivery
+ * delivery.
**/
public double msgDropRate;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentTopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentTopicStats.java
index bef5609..c426ce9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentTopicStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentTopicStats.java
@@ -20,29 +20,29 @@ package org.apache.pulsar.common.policies.data;
import static com.google.common.base.Preconditions.checkNotNull;
-import java.util.List;
-import java.util.Map;
-
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
/**
+ * Statistics for a non-persistent topic.
*/
public class NonPersistentTopicStats extends TopicStats {
/**
* for non-persistent topic: broker drops msg if publisher publishes messages more than configured max inflight
- * messages per connection
+ * messages per connection.
**/
public double msgDropRate;
- /** List of connected publishers on this topic w/ their stats */
+ /** List of connected publishers on this topic w/ their stats. */
public List<NonPersistentPublisherStats> publishers;
- /** Map of subscriptions with their individual statistics */
+ /** Map of subscriptions with their individual statistics. */
public Map<String, NonPersistentSubscriptionStats> subscriptions;
- /** Map of replication statistics by remote cluster context */
+ /** Map of replication statistics by remote cluster context. */
public Map<String, NonPersistentReplicatorStats> replication;
public NonPersistentTopicStats() {
@@ -57,14 +57,14 @@ public class NonPersistentTopicStats extends TopicStats {
}
// if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
- // stats
+ // stats.
public NonPersistentTopicStats add(NonPersistentTopicStats stats) {
checkNotNull(stats);
super.add(stats);
this.msgDropRate += stats.msgDropRate;
return this;
}
-
+
public List<NonPersistentPublisherStats> getPublishers() {
return this.publishers;
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PartitionedTopicInternalStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PartitionedTopicInternalStats.java
index 5725d3b..d20e03f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PartitionedTopicInternalStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PartitionedTopicInternalStats.java
@@ -18,12 +18,13 @@
*/
package org.apache.pulsar.common.policies.data;
+import com.google.common.collect.Maps;
import java.util.Map;
-
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
-import com.google.common.collect.Maps;
-
+/**
+ * Internal statistics for a partitioned topic.
+ */
public class PartitionedTopicInternalStats {
public PartitionedTopicMetadata metadata;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PartitionedTopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PartitionedTopicStats.java
index 4db5397..cce7828 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PartitionedTopicStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PartitionedTopicStats.java
@@ -18,12 +18,13 @@
*/
package org.apache.pulsar.common.policies.data;
+import com.google.common.collect.Maps;
import java.util.Map;
-
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
-import com.google.common.collect.Maps;
-
+/**
+ * Statistics for a partitioned topic.
+ */
public class PartitionedTopicStats extends TopicStats {
public PartitionedTopicMetadata metadata;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java
index 08e94a0..466b338 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java
@@ -18,10 +18,12 @@
*/
package org.apache.pulsar.common.policies.data;
-import java.util.Objects;
-
import com.google.common.base.MoreObjects;
+import java.util.Objects;
+/**
+ * Configuration of bookkeeper persistence policies.
+ */
public class PersistencePolicies {
private int bookkeeperEnsemble;
private int bookkeeperWriteQuorum;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentOfflineTopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentOfflineTopicStats.java
index 2c4227b..87e858f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentOfflineTopicStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentOfflineTopicStats.java
@@ -18,39 +18,38 @@
*/
package org.apache.pulsar.common.policies.data;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import java.util.Date;
import java.util.List;
import java.util.Map;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
/**
- * This object is populated using meta data in zookeeper without actually bringing the topic online
+ * This object is populated using meta data in zookeeper without actually bringing the topic online.
*/
public class PersistentOfflineTopicStats {
- /** Space used to store the messages for the topic. bytes */
+ /** Space used to store the messages for the topic (bytes). */
public long storageSize;
- /** Total number of messages */
+ /** Total number of messages. */
public long totalMessages;
- /** Total backlog */
+ /** Total backlog. */
public long messageBacklog;
- /** Broker host where this stat was generated */
+ /** Broker host where this stat was generated. */
public final String brokerName;
- /** offline topic name */
+ /** Offline topic name. */
public final String topicName;
- /** data ledger ids */
+ /** Data ledger ids. */
public List<LedgerDetails> dataLedgerDetails;
- /** cursor ledger ids and backlog */
+ /** Cursor ledger ids and backlog. */
public Map<String, CursorDetails> cursorDetails;
- /** timestamp when stat was generated */
+ /** Timestamp when stat was generated. */
public Date statGeneratedAt;
public PersistentOfflineTopicStats(String topicName, String brokerName) {
@@ -70,6 +69,9 @@ public class PersistentOfflineTopicStats {
this.statGeneratedAt.setTime(System.currentTimeMillis());
}
+ /**
+ * Details about a cursor.
+ */
public class CursorDetails {
public long cursorBacklog;
public long cursorLedgerId;
@@ -88,6 +90,9 @@ public class PersistentOfflineTopicStats {
this.dataLedgerDetails.add(new LedgerDetails(entries, timestamp, size, ledgerId));
}
+ /**
+ * Details about a ledger.
+ */
public class LedgerDetails {
public long entries;
public long timestamp;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
index c0416f5..aa9c595 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
@@ -21,6 +21,9 @@ package org.apache.pulsar.common.policies.data;
import java.util.List;
import java.util.Map;
+/**
+ * Persistent topic internal statistics.
+ */
public class PersistentTopicInternalStats {
public long entriesAddedCounter;
@@ -41,6 +44,9 @@ public class PersistentTopicInternalStats {
public List<LedgerInfo> ledgers;
public Map<String, CursorStats> cursors;
+ /**
+ * Ledger information.
+ */
public static class LedgerInfo {
public long ledgerId;
public long entries;
@@ -48,6 +54,9 @@ public class PersistentTopicInternalStats {
public boolean offloaded;
}
+ /**
+ * Pulsar cursor statistics.
+ */
public static class CursorStats {
public String markDeletePosition;
public String readPosition;
@@ -62,8 +71,7 @@ public class PersistentTopicInternalStats {
public String state;
public long numberOfEntriesSinceFirstNotAckedMessage;
public int totalNonContiguousDeletedMessagesRange;
-
+
public Map<String, Long> properties;
}
-
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicStats.java
index c093e44..e838562 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicStats.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.common.policies.data;
/**
- * Maintain compatibility with previous class name
+ * Maintain compatibility with previous class name.
*/
public class PersistentTopicStats extends TopicStats {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index efa18e4..5f23be7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -18,21 +18,26 @@
*/
package org.apache.pulsar.common.policies.data;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import com.google.common.collect.Sets;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
+/**
+ * Definition of Pulsar policies.
+ */
public class Policies {
+ @SuppressWarnings("checkstyle:MemberName")
public final AuthPolicies auth_policies = new AuthPolicies();
+ @SuppressWarnings("checkstyle:MemberName")
public Set<String> replication_clusters = Sets.newHashSet();
public BundlesData bundles;
+ @SuppressWarnings("checkstyle:MemberName")
public Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlog_quota_map = Maps.newHashMap();
public Map<String, DispatchRate> topicDispatchRate = Maps.newHashMap();
public Map<String, DispatchRate> subscriptionDispatchRate = Maps.newHashMap();
@@ -43,8 +48,11 @@ public class Policies {
// If set, it will override the broker settings for enabling deduplication
public Boolean deduplicationEnabled = null;
+ @SuppressWarnings("checkstyle:MemberName")
public Map<String, Integer> latency_stats_sample_rate = Maps.newHashMap();
+ @SuppressWarnings("checkstyle:MemberName")
public int message_ttl_in_seconds = 0;
+ @SuppressWarnings("checkstyle:MemberName")
public RetentionPolicies retention_policies = null;
public boolean deleted = false;
public String antiAffinityGroup;
@@ -52,20 +60,30 @@ public class Policies {
public static final String FIRST_BOUNDARY = "0x00000000";
public static final String LAST_BOUNDARY = "0xffffffff";
+ @SuppressWarnings("checkstyle:MemberName")
public boolean encryption_required = false;
+ @SuppressWarnings("checkstyle:MemberName")
public SubscriptionAuthMode subscription_auth_mode = SubscriptionAuthMode.None;
+ @SuppressWarnings("checkstyle:MemberName")
public int max_producers_per_topic = 0;
+ @SuppressWarnings("checkstyle:MemberName")
public int max_consumers_per_topic = 0;
+ @SuppressWarnings("checkstyle:MemberName")
public int max_consumers_per_subscription = 0;
+ @SuppressWarnings("checkstyle:MemberName")
public long compaction_threshold = 0;
+ @SuppressWarnings("checkstyle:MemberName")
public long offload_threshold = -1;
+ @SuppressWarnings("checkstyle:MemberName")
public Long offload_deletion_lag_ms = null;
+ @SuppressWarnings("checkstyle:MemberName")
public SchemaAutoUpdateCompatibilityStrategy schema_auto_update_compatibility_strategy =
SchemaAutoUpdateCompatibilityStrategy.Full;
+ @SuppressWarnings("checkstyle:MemberName")
public boolean schema_validation_enforced = false;
@Override
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java
index 7514b2a..cec2257 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java
@@ -18,50 +18,51 @@
*/
package org.apache.pulsar.common.policies.data;
-import java.util.Map;
-
import static com.google.common.base.Preconditions.checkNotNull;
+import java.util.Map;
+
/**
+ * Statistics about a publisher.
*/
public class PublisherStats {
private int count;
- /** Total rate of messages published by this publisher. msg/s */
+ /** Total rate of messages published by this publisher (msg/s). */
public double msgRateIn;
- /** Total throughput of messages published by this publisher. byte/s */
+ /** Total throughput of messages published by this publisher (byte/s). */
public double msgThroughputIn;
- /** Average message size published by this publisher */
+ /** Average message size published by this publisher. */
public double averageMsgSize;
- /** Id of this publisher */
+ /** Id of this publisher. */
public long producerId;
- /** Producer name */
+ /** Producer name. */
private int producerNameOffset = -1;
private int producerNameLength;
- /** Address of this publisher */
+ /** Address of this publisher. */
private int addressOffset = -1;
private int addressLength;
- /** Timestamp of connection */
+ /** Timestamp of connection. */
private int connectedSinceOffset = -1;
private int connectedSinceLength;
- /** Client library version */
+ /** Client library version. */
private int clientVersionOffset = -1;
private int clientVersionLength;
/**
* In order to prevent multiple string objects under stats: create a string-buffer that stores data for all string
- * place-holders
+ * place-holders.
*/
private StringBuilder stringBuffer = new StringBuilder();
- /** Metadata (key/value strings) associated with this publisher */
+ /** Metadata (key/value strings) associated with this publisher. */
public Map<String, String> metadata;
public PublisherStats add(PublisherStats stats) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ReplicatorStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ReplicatorStats.java
index 9428f06..a3dbec1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ReplicatorStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ReplicatorStats.java
@@ -20,42 +20,45 @@ package org.apache.pulsar.common.policies.data;
import static com.google.common.base.Preconditions.checkNotNull;
+/**
+ * Statistics about a replicator.
+ */
public class ReplicatorStats {
- /** Total rate of messages received from the remote cluster. msg/s */
+ /** Total rate of messages received from the remote cluster (msg/s). */
public double msgRateIn;
- /** Total throughput received from the remote cluster. bytes/s */
+ /** Total throughput received from the remote cluster (bytes/s). */
public double msgThroughputIn;
- /** Total rate of messages delivered to the replication-subscriber. msg/s */
+ /** Total rate of messages delivered to the replication-subscriber (msg/s). */
public double msgRateOut;
- /** Total throughput delivered to the replication-subscriber. bytes/s */
+ /** Total throughput delivered to the replication-subscriber (bytes/s). */
public double msgThroughputOut;
- /** Total rate of messages expired. msg/s */
+ /** Total rate of messages expired (msg/s). */
public double msgRateExpired;
- /** Number of messages pending to be replicated to remote cluster */
+ /** Number of messages pending to be replicated to remote cluster. */
public long replicationBacklog;
- /** is the replication-subscriber up and running to replicate to remote cluster */
+ /** is the replication-subscriber up and running to replicate to remote cluster. */
public boolean connected;
- /** Time in seconds from the time a message was produced to the time when it is about to be replicated */
+ /** Time in seconds from the time a message was produced to the time when it is about to be replicated. */
public long replicationDelayInSeconds;
- /** Address of incoming replication connection */
+ /** Address of incoming replication connection. */
public String inboundConnection;
- /** Timestamp of incoming connection establishment time */
+ /** Timestamp of incoming connection establishment time. */
public String inboundConnectedSince;
- /** Address of outbound replication connection */
+ /** Address of outbound replication connection. */
public String outboundConnection;
- /** Timestamp of outbound connection establishment time */
+ /** Timestamp of outbound connection establishment time. */
public String outboundConnectedSince;
public ReplicatorStats add(ReplicatorStats stats) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ResourceQuota.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ResourceQuota.java
index b12177e..c05fbd9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ResourceQuota.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ResourceQuota.java
@@ -50,8 +50,8 @@ public class ResourceQuota {
}
/**
- * Set incoming message rate quota
- *
+ * Set incoming message rate quota.
+ *
* @param msgRateIn
* incoming messages rate quota (msg/sec)
*/
@@ -60,8 +60,8 @@ public class ResourceQuota {
}
/**
- * Get incoming message rate quota
- *
+ * Get incoming message rate quota.
+ *
* @return incoming message rate quota (msg/sec)
*/
public double getMsgRateIn() {
@@ -69,8 +69,8 @@ public class ResourceQuota {
}
/**
- * Set outgoing message rate quota
- *
+ * Set outgoing message rate quota.
+ *
* @param msgRateOut
* outgoing messages rate quota (msg/sec)
*/
@@ -79,8 +79,8 @@ public class ResourceQuota {
}
/**
- * Get outgoing message rate quota
- *
+ * Get outgoing message rate quota.
+ *
* @return outgoing message rate quota (msg/sec)
*/
public double getMsgRateOut() {
@@ -88,8 +88,8 @@ public class ResourceQuota {
}
/**
- * Set inbound bandwidth quota
- *
+ * Set inbound bandwidth quota.
+ *
* @param bandwidthIn
* inbound bandwidth quota (bytes/sec)
*/
@@ -98,8 +98,8 @@ public class ResourceQuota {
}
/**
- * Get inbound bandwidth quota
- *
+ * Get inbound bandwidth quota.
+ *
* @return inbound bandwidth quota (bytes/sec)
*/
public double getBandwidthIn() {
@@ -107,8 +107,8 @@ public class ResourceQuota {
}
/**
- * Set outbound bandwidth quota
- *
+ * Set outbound bandwidth quota.
+ *
* @param bandwidthOut
* outbound bandwidth quota (bytes/sec)
*/
@@ -117,8 +117,8 @@ public class ResourceQuota {
}
/**
- * Get outbound bandwidth quota
- *
+ * Get outbound bandwidth quota.
+ *
* @return outbound bandwidth quota (bytes/sec)
*/
public double getBandwidthOut() {
@@ -126,8 +126,8 @@ public class ResourceQuota {
}
/**
- * Set memory quota
- *
+ * Set memory quota.
+ *
* @param memory
* memory quota (Mbytes)
*/
@@ -136,8 +136,8 @@ public class ResourceQuota {
}
/**
- * Get memory quota
- *
+ * Get memory quota.
+ *
* @return memory quota (Mbytes)
*/
public double getMemory() {
@@ -145,8 +145,8 @@ public class ResourceQuota {
}
/**
- * Set dynamic to true/false
- *
+ * Set dynamic to true/false.
+ *
* @param dynamic
* allow the quota to be dynamically re-calculated
*/
@@ -155,8 +155,8 @@ public class ResourceQuota {
}
/**
- * Get dynamic setting
- *
+ * Get dynamic setting.
+ *
* @return is dynamic or not
*/
public boolean getDynamic() {
@@ -164,7 +164,7 @@ public class ResourceQuota {
}
/**
- * Check if this is a valid quota definition
+ * Check if this is a valid quota definition.
*/
@JsonIgnore
public boolean isValid() {
@@ -178,7 +178,7 @@ public class ResourceQuota {
/**
* Add quota.
- *
+ *
* @param quota
* <code>ResourceQuota</code> to add
*/
@@ -192,7 +192,7 @@ public class ResourceQuota {
/**
* Substract quota.
- *
+ *
* @param quota
* <code>ResourceQuota</code> to substract
*/
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/RetentionPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/RetentionPolicies.java
index 86959e3..c4b8688 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/RetentionPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/RetentionPolicies.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.common.policies.data;
/**
+ * Definition of the retention policy.
*/
public class RetentionPolicies {
private int retentionTimeInMinutes;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SchemaAutoUpdateCompatibilityStrategy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SchemaAutoUpdateCompatibilityStrategy.java
index 7a26c65..e10144e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SchemaAutoUpdateCompatibilityStrategy.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SchemaAutoUpdateCompatibilityStrategy.java
@@ -66,7 +66,7 @@ public enum SchemaAutoUpdateCompatibilityStrategy {
ForwardTransitive,
/**
- * BackwardTransitive and ForwardTransitive
+ * BackwardTransitive and ForwardTransitive.
*/
FullTransitive
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java
index 3b7a7a1..fe70422 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java
@@ -18,13 +18,15 @@
*/
package org.apache.pulsar.common.policies.data;
-import lombok.Data;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
-
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
+import lombok.Data;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+/**
+ * Status of Pulsar Sink.
+ */
@Data
public class SinkStatus {
// The total number of sink instances that ought to be running
@@ -33,11 +35,17 @@ public class SinkStatus {
public int numRunning;
public List<SinkInstanceStatus> instances = new LinkedList<>();
+ /**
+ * Status of a Sink instance.
+ */
@Data
public static class SinkInstanceStatus {
public int instanceId;
public SinkInstanceStatusData status;
+ /**
+ * Status data of a Sink instance.
+ */
@Data
public static class SinkInstanceStatusData {
// Is this instance running?
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java
index 3f103e0..3e531e9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java
@@ -18,13 +18,15 @@
*/
package org.apache.pulsar.common.policies.data;
-import lombok.Data;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
-
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
+import lombok.Data;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+/**
+ * Source status.
+ */
@Data
public class SourceStatus {
// The total number of source instances that ought to be running
@@ -33,11 +35,17 @@ public class SourceStatus {
public int numRunning;
public List<SourceInstanceStatus> instances = new LinkedList<>();
+ /**
+ * Source instance status.
+ */
@Data
public static class SourceInstanceStatus {
public int instanceId;
public SourceInstanceStatusData status;
+ /**
+ * Source instance status data.
+ */
@Data
public static class SourceInstanceStatusData {
// Is this instance running?
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscribeRate.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscribeRate.java
index 54fa2c7..113cf52 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscribeRate.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscribeRate.java
@@ -22,6 +22,9 @@ import com.google.common.base.MoreObjects;
import java.util.Objects;
+/**
+ * Information about subscription rate.
+ */
public class SubscribeRate {
public int subscribeThrottlingRatePerConsumer = -1;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionAuthMode.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionAuthMode.java
index 5a57147..9f9d24b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionAuthMode.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionAuthMode.java
@@ -20,12 +20,12 @@
package org.apache.pulsar.common.policies.data;
/**
- * Subscription authorization for Pulsar policies
+ * Subscription authorization for Pulsar policies.
*/
public enum SubscriptionAuthMode {
- /** Every subscription name can be used by every role */
+ /** Every subscription name can be used by every role. */
None,
- /** Subscription name with auth role prefix can be used by the role */
+ /** Subscription name with auth role prefix can be used by the role. */
Prefix,
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index 7cdf514..4406668 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -20,49 +20,48 @@ package org.apache.pulsar.common.policies.data;
import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.collect.Lists;
import java.util.List;
-
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-import com.google.common.collect.Lists;
-
/**
+ * Statistics about subscription.
*/
public class SubscriptionStats {
- /** Total rate of messages delivered on this subscription. msg/s */
+ /** Total rate of messages delivered on this subscription (msg/s). */
public double msgRateOut;
- /** Total throughput delivered on this subscription. bytes/s */
+ /** Total throughput delivered on this subscription (bytes/s). */
public double msgThroughputOut;
- /** Total rate of messages redelivered on this subscription. msg/s */
+ /** Total rate of messages redelivered on this subscription (msg/s). */
public double msgRateRedeliver;
- /** Number of messages in the subscription backlog */
+ /** Number of messages in the subscription backlog. */
public long msgBacklog;
- /** Flag to verify if subscription is blocked due to reaching threshold of unacked messages */
+ /** Flag to verify if subscription is blocked due to reaching threshold of unacked messages. */
public boolean blockedSubscriptionOnUnackedMsgs;
- /** Number of delayed messages currently being tracked */
+ /** Number of delayed messages currently being tracked. */
public long msgDelayed;
- /** Number of unacknowledged messages for the subscription */
+ /** Number of unacknowledged messages for the subscription. */
public long unackedMessages;
- /** Whether this subscription is Exclusive or Shared or Failover */
+ /** Whether this subscription is Exclusive or Shared or Failover. */
public SubType type;
- /** The name of the consumer that is active for single active consumer subscriptions i.e. failover or exclusive */
+ /** The name of the consumer that is active for single active consumer subscriptions i.e. failover or exclusive. */
public String activeConsumerName;
- /** Total rate of messages expired on this subscription. msg/s */
+ /** Total rate of messages expired on this subscription (msg/s). */
public double msgRateExpired;
- /** List of connected consumers on this subscription w/ their stats */
+ /** List of connected consumers on this subscription w/ their stats. */
public List<ConsumerStats> consumers;
- /** Mark that the subscription state is kept in sync across different regions */
+ /** Mark that the subscription state is kept in sync across different regions. */
public boolean isReplicated;
public SubscriptionStats() {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TenantInfo.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TenantInfo.java
index 824acb6..1c7ef39 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TenantInfo.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TenantInfo.java
@@ -26,11 +26,14 @@ import java.util.Set;
import lombok.Data;
+/**
+ * Information of admin roles and allowed clusters for tenant.
+ */
@Data
@ApiModel(value = "TenantInfo", description = "Information of adminRoles and allowedClusters for tenant")
public class TenantInfo {
/**
- * List of role enabled as admin for this tenant
+ * List of role enabled as admin for this tenant.
*/
@ApiModelProperty(
value = "Comma separated list of auth principal allowed to administrate the tenant.",
@@ -39,7 +42,7 @@ public class TenantInfo {
private Set<String> adminRoles;
/**
- * List of clusters this tenant is restricted on
+ * List of clusters this tenant is restricted on.
*/
@ApiModelProperty(
value = "Comma separated allowed clusters.",
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
index e2f1802..bbce456 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
@@ -20,44 +20,44 @@ package org.apache.pulsar.common.policies.data;
import static com.google.common.base.Preconditions.checkNotNull;
-import java.util.List;
-import java.util.Map;
-
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
/**
+ * Statistics for a Pulsar topic.
*/
public class TopicStats {
private int count;
- /** Total rate of messages published on the topic. msg/s */
+ /** Total rate of messages published on the topic (msg/s). */
public double msgRateIn;
- /** Total throughput of messages published on the topic. byte/s */
+ /** Total throughput of messages published on the topic (byte/s). */
public double msgThroughputIn;
- /** Total rate of messages dispatched for the topic. msg/s */
+ /** Total rate of messages dispatched for the topic (msg/s). */
public double msgRateOut;
- /** Total throughput of messages dispatched for the topic. byte/s */
+ /** Total throughput of messages dispatched for the topic (byte/s). */
public double msgThroughputOut;
- /** Average size of published messages. bytes */
+ /** Average size of published messages (bytes). */
public double averageMsgSize;
- /** Space used to store the messages for the topic. bytes */
+ /** Space used to store the messages for the topic (bytes). */
public long storageSize;
- /** List of connected publishers on this topic w/ their stats */
+ /** List of connected publishers on this topic w/ their stats. */
public List<PublisherStats> publishers;
- /** Map of subscriptions with their individual statistics */
+ /** Map of subscriptions with their individual statistics. */
public Map<String, SubscriptionStats> subscriptions;
- /** Map of replication statistics by remote cluster context */
+ /** Map of replication statistics by remote cluster context. */
public Map<String, ReplicatorStats> replication;
-
+
public String deduplicationStatus;
public TopicStats() {
@@ -81,7 +81,7 @@ public class TopicStats {
}
// if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
- // stats
+ // stats.
public TopicStats add(TopicStats stats) {
checkNotNull(stats);
this.count++;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/WorkerFunctionInstanceStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/WorkerFunctionInstanceStats.java
index 86079bb..8781898 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/WorkerFunctionInstanceStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/WorkerFunctionInstanceStats.java
@@ -19,11 +19,15 @@
package org.apache.pulsar.common.policies.data;
import lombok.Data;
-import org.apache.pulsar.common.policies.data.FunctionStats;
+/**
+ * Data class holding statistics about a function.
+ */
@Data
public class WorkerFunctionInstanceStats {
- /** fully qualified function instance name **/
+ /**
+ * Fully qualified function instance name.
+ */
public String name;
public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData metrics;
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/package-info.java
similarity index 87%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/package-info.java
index 1151443..3fa83b4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Definition of different data structures for policies.
+ */
+package org.apache.pulsar.common.policies.data;
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/AutoFailoverPolicyFactory.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/AutoFailoverPolicyFactory.java
index 3f28308..7a3f371 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/AutoFailoverPolicyFactory.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/AutoFailoverPolicyFactory.java
@@ -22,6 +22,9 @@ import org.apache.pulsar.common.policies.AutoFailoverPolicy;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
+/**
+ * Factory to generate an instance of {@link AutoFailoverPolicy}.
+ */
public class AutoFailoverPolicyFactory {
public static AutoFailoverPolicy create(AutoFailoverPolicyData policyData) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/MinAvailablePolicy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/MinAvailablePolicy.java
index 6763a44..fb90cea 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/MinAvailablePolicy.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/MinAvailablePolicy.java
@@ -20,21 +20,24 @@ package org.apache.pulsar.common.policies.impl;
import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.base.Objects;
import java.util.SortedSet;
-
import org.apache.pulsar.common.policies.AutoFailoverPolicy;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
import org.apache.pulsar.common.policies.data.BrokerStatus;
-import com.google.common.base.Objects;
-
+/**
+ * Implementation of min available policy.
+ */
public class MinAvailablePolicy extends AutoFailoverPolicy {
private static final String MIN_LIMIT_KEY = "min_limit";
private static final String USAGE_THRESHOLD_KEY = "usage_threshold";
private static final int MAX_USAGE_THRESHOLD = 100;
+ @SuppressWarnings("checkstyle:MemberName")
public int min_limit;
+ @SuppressWarnings("checkstyle:MemberName")
public int usage_threshold;
MinAvailablePolicy(int minLimit, int usageThreshold) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicies.java
index f369c5d..041f176 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicies.java
@@ -28,6 +28,9 @@ import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.BrokerStatus;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
+/**
+ * Namespace isolation policies.
+ */
public class NamespaceIsolationPolicies {
private Map<String, NamespaceIsolationData> policies = null;
@@ -41,8 +44,8 @@ public class NamespaceIsolationPolicies {
}
/**
- * Access method to get the namespace isolation policy by the policy name
- *
+ * Access method to get the namespace isolation policy by the policy name.
+ *
* @param policyName
* @return
*/
@@ -54,13 +57,11 @@ public class NamespaceIsolationPolicies {
}
/**
- * Get the namespace isolation policy for the specified namespace
- *
- * <p>
- * There should only be one namespace isolation policy defined for the specific namespace. If multiple policies
+ * Get the namespace isolation policy for the specified namespace.
+ *
+ * <p>There should only be one namespace isolation policy defined for the specific namespace. If multiple policies
* match, the first one will be returned.
- * <p>
- *
+ *
* @param namespace
* @return
*/
@@ -83,8 +84,8 @@ public class NamespaceIsolationPolicies {
}
/**
- * Set the policy data for a single policy
- *
+ * Set the policy data for a single policy.
+ *
* @param policyName
* @param policyData
*/
@@ -94,8 +95,8 @@ public class NamespaceIsolationPolicies {
}
/**
- * Delete a policy
- *
+ * Delete a policy.
+ *
* @param policyName
*/
public void deletePolicy(String policyName) {
@@ -103,8 +104,8 @@ public class NamespaceIsolationPolicies {
}
/**
- * Get the full policy map
- *
+ * Get the full policy map.
+ *
* @return All policy data in a map
*/
public Map<String, NamespaceIsolationData> getPolicies() {
@@ -112,8 +113,8 @@ public class NamespaceIsolationPolicies {
}
/**
- * Check to see whether a broker is in the shared broker pool or not
- *
+ * Check to see whether a broker is in the shared broker pool or not.
+ *
* @param host
* @return
*/
@@ -129,9 +130,9 @@ public class NamespaceIsolationPolicies {
}
/**
- * Get the broker assignment based on the namespace name
- *
- * @param nsname
+ * Get the broker assignment based on the namespace name.
+ *
+ * @param nsPolicy
* The namespace name
* @param brokerAddress
* The broker adderss is the format of host:port
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java
index c68e0e5..b33d924 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java
@@ -18,26 +18,27 @@
*/
package org.apache.pulsar.common.policies.impl;
+import com.google.common.base.Objects;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
-
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.AutoFailoverPolicy;
import org.apache.pulsar.common.policies.NamespaceIsolationPolicy;
import org.apache.pulsar.common.policies.data.BrokerStatus;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
-import com.google.common.base.Objects;
-
+/**
+ * Implementation of the namespace isolation policy.
+ */
public class NamespaceIsolationPolicyImpl implements NamespaceIsolationPolicy {
private List<String> namespaces;
private List<String> primary;
private List<String> secondary;
- private AutoFailoverPolicy auto_failover_policy;
+ private AutoFailoverPolicy autoFailoverPolicy;
private boolean matchNamespaces(String fqnn) {
for (String nsRegex : namespaces) {
@@ -62,7 +63,7 @@ public class NamespaceIsolationPolicyImpl implements NamespaceIsolationPolicy {
this.namespaces = policyData.namespaces;
this.primary = policyData.primary;
this.secondary = policyData.secondary;
- this.auto_failover_policy = AutoFailoverPolicyFactory.create(policyData.auto_failover_policy);
+ this.autoFailoverPolicy = AutoFailoverPolicyFactory.create(policyData.auto_failover_policy);
}
@Override
@@ -121,7 +122,7 @@ public class NamespaceIsolationPolicyImpl implements NamespaceIsolationPolicy {
@Override
public int hashCode() {
return Objects.hashCode(namespaces, primary, secondary,
- auto_failover_policy);
+ autoFailoverPolicy);
}
@Override
@@ -130,7 +131,7 @@ public class NamespaceIsolationPolicyImpl implements NamespaceIsolationPolicy {
NamespaceIsolationPolicyImpl other = (NamespaceIsolationPolicyImpl) obj;
return Objects.equal(this.namespaces, other.namespaces) && Objects.equal(this.primary, other.primary)
&& Objects.equal(this.secondary, other.secondary)
- && Objects.equal(this.auto_failover_policy, other.auto_failover_policy);
+ && Objects.equal(this.autoFailoverPolicy, other.autoFailoverPolicy);
}
return false;
@@ -140,7 +141,7 @@ public class NamespaceIsolationPolicyImpl implements NamespaceIsolationPolicy {
public SortedSet<BrokerStatus> getAvailablePrimaryBrokers(SortedSet<BrokerStatus> primaryCandidates) {
SortedSet<BrokerStatus> availablePrimaries = new TreeSet<BrokerStatus>();
for (BrokerStatus status : primaryCandidates) {
- if (this.auto_failover_policy.isBrokerAvailable(status)) {
+ if (this.autoFailoverPolicy.isBrokerAvailable(status)) {
availablePrimaries.add(status);
}
}
@@ -149,22 +150,22 @@ public class NamespaceIsolationPolicyImpl implements NamespaceIsolationPolicy {
@Override
public boolean shouldFailover(SortedSet<BrokerStatus> brokerStatus) {
- return this.auto_failover_policy.shouldFailoverToSecondary(brokerStatus);
+ return this.autoFailoverPolicy.shouldFailoverToSecondary(brokerStatus);
}
public boolean shouldFailover(int totalPrimaryResourceUnits) {
- return this.auto_failover_policy.shouldFailoverToSecondary(totalPrimaryResourceUnits);
+ return this.autoFailoverPolicy.shouldFailoverToSecondary(totalPrimaryResourceUnits);
}
@Override
public boolean isPrimaryBrokerAvailable(BrokerStatus brkStatus) {
return this.isPrimaryBroker(brkStatus.getBrokerAddress())
- && this.auto_failover_policy.isBrokerAvailable(brkStatus);
+ && this.autoFailoverPolicy.isBrokerAvailable(brkStatus);
}
@Override
public String toString() {
return String.format("namespaces=%s primary=%s secondary=%s auto_failover_policy=%s", namespaces, primary,
- secondary, auto_failover_policy);
+ secondary, autoFailoverPolicy);
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/package-info.java
similarity index 90%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/package-info.java
index 1151443..47e3597 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Implementation of policies.
+ */
+package org.apache.pulsar.common.policies.impl;
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/package-info.java
similarity index 91%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/policies/package-info.java
index 1151443..6a24f34 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Common policies.
+ */
+package org.apache.pulsar.common.policies;
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java
index 1953482..af0dafe 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java
@@ -54,9 +54,9 @@ public final class ByteBufPair extends AbstractReferenceCounted {
/**
* Get a new {@link ByteBufPair} from the pool and assign 2 buffers to it.
- * <p>
- * The buffers b1 and b2 lifecycles are now managed by the ByteBufPair: when the {@link ByteBufPair} is deallocated,
- * b1 and b2 will be released as well.
+ *
+ * <p>The buffers b1 and b2 lifecycles are now managed by the ByteBufPair:
+ * when the {@link ByteBufPair} is deallocated, b1 and b2 will be released as well.
*
* @param b1
* @param b2
@@ -112,6 +112,7 @@ public final class ByteBufPair extends AbstractReferenceCounted {
public static final CopyingEncoder COPYING_ENCODER = new CopyingEncoder();
@Sharable
+ @SuppressWarnings("checkstyle:JavadocType")
public static class Encoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
@@ -134,6 +135,7 @@ public final class ByteBufPair extends AbstractReferenceCounted {
}
@Sharable
+ @SuppressWarnings("checkstyle:JavadocType")
public static class CopyingEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/CommandUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/CommandUtils.java
index 609ff72..df466ad 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/CommandUtils.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/CommandUtils.java
@@ -18,13 +18,15 @@
*/
package org.apache.pulsar.common.protocol;
-import org.apache.pulsar.common.api.proto.PulsarApi;
-
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+/**
+ * Helper class to work with commands.
+ */
public final class CommandUtils {
private CommandUtils() {}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 1386879..2262225 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -25,20 +25,16 @@ import static org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyF
import static org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8;
import com.google.common.annotations.VisibleForTesting;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
-
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
-
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.AuthData;
@@ -118,6 +114,7 @@ import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
@UtilityClass
@Slf4j
+@SuppressWarnings("checkstyle:JavadocType")
public class Commands {
// default message size for transfer
@@ -125,6 +122,7 @@ public class Commands {
public static final int MESSAGE_SIZE_FRAME_PADDING = 10 * 1024;
public static final int INVALID_MAX_MESSAGE_SIZE = -1;
+ @SuppressWarnings("checkstyle:ConstantName")
public static final short magicCrc32c = 0x0e01;
private static final int checksumSize = 4;
@@ -134,7 +132,8 @@ public class Commands {
}
public static ByteBuf newConnect(String authMethodName, String authData, String libVersion, String targetBroker) {
- return newConnect(authMethodName, authData, getCurrentProtocolVersion(), libVersion, targetBroker, null, null, null);
+ return newConnect(authMethodName, authData, getCurrentProtocolVersion(), libVersion, targetBroker, null, null,
+ null);
}
public static ByteBuf newConnect(String authMethodName, String authData, String libVersion, String targetBroker,
@@ -262,7 +261,8 @@ public class Commands {
.build())
.build();
- ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.AUTH_CHALLENGE).setAuthChallenge(challenge));
+ ByteBuf res = serializeWithSize(
+ BaseCommand.newBuilder().setType(Type.AUTH_CHALLENGE).setAuthChallenge(challenge));
challenge.recycle();
challengeBuilder.recycle();
return res;
@@ -304,7 +304,8 @@ public class Commands {
return newProducerSuccess(requestId, producerName, -1, schemaVersion);
}
- public static ByteBuf newProducerSuccess(long requestId, String producerName, long lastSequenceId, SchemaVersion schemaVersion) {
+ public static ByteBuf newProducerSuccess(long requestId, String producerName, long lastSequenceId,
+ SchemaVersion schemaVersion) {
CommandProducerSuccess.Builder producerSuccessBuilder = CommandProducerSuccess.newBuilder();
producerSuccessBuilder.setRequestId(requestId);
producerSuccessBuilder.setProducerName(producerName);
@@ -371,7 +372,7 @@ public class Commands {
/**
* Read the checksum and advance the reader index in the buffer.
*
- * Note: This method assume the checksum presence was already verified before.
+ * <p>Note: This method assume the checksum presence was already verified before.
*/
public static int readChecksum(ByteBuf buffer) {
buffer.skipBytes(2); //skip magic bytes
@@ -386,8 +387,8 @@ public class Commands {
public static MessageMetadata parseMessageMetadata(ByteBuf buffer) {
try {
- // initially reader-index may point to start_of_checksum : increment reader-index to start_of_metadata to parse
- // metadata
+ // initially reader-index may point to start_of_checksum : increment reader-index to start_of_metadata
+ // to parse metadata
skipChecksumIfPresent(buffer);
int metadataSize = (int) buffer.readUnsignedInt();
@@ -413,7 +414,8 @@ public class Commands {
buffer.skipBytes(metadataSize);
}
- public static ByteBufPair newMessage(long consumerId, MessageIdData messageId, int redeliveryCount, ByteBuf metadataAndPayload) {
+ public static ByteBufPair newMessage(long consumerId, MessageIdData messageId, int redeliveryCount,
+ ByteBuf metadataAndPayload) {
CommandMessage.Builder msgBuilder = CommandMessage.newBuilder();
msgBuilder.setConsumerId(consumerId);
msgBuilder.setMessageId(messageId);
@@ -670,8 +672,8 @@ public class Commands {
}
public static ByteBuf newPartitionMetadataResponse(ServerError error, String errorMsg, long requestId) {
- CommandPartitionedTopicMetadataResponse.Builder partitionMetadataResponseBuilder = CommandPartitionedTopicMetadataResponse
- .newBuilder();
+ CommandPartitionedTopicMetadataResponse.Builder partitionMetadataResponseBuilder =
+ CommandPartitionedTopicMetadataResponse.newBuilder();
partitionMetadataResponseBuilder.setRequestId(requestId);
partitionMetadataResponseBuilder.setError(error);
partitionMetadataResponseBuilder.setResponse(CommandPartitionedTopicMetadataResponse.LookupType.Failed);
@@ -700,8 +702,8 @@ public class Commands {
}
public static ByteBuf newPartitionMetadataResponse(int partitions, long requestId) {
- CommandPartitionedTopicMetadataResponse.Builder partitionMetadataResponseBuilder = CommandPartitionedTopicMetadataResponse
- .newBuilder();
+ CommandPartitionedTopicMetadataResponse.Builder partitionMetadataResponseBuilder =
+ CommandPartitionedTopicMetadataResponse.newBuilder();
partitionMetadataResponseBuilder.setPartitions(partitions);
partitionMetadataResponseBuilder.setResponse(CommandPartitionedTopicMetadataResponse.LookupType.Success);
partitionMetadataResponseBuilder.setRequestId(requestId);
@@ -756,7 +758,8 @@ public class Commands {
connectionBuilder.setResponse(LookupType.Failed);
CommandLookupTopicResponse connectionBroker = connectionBuilder.build();
- ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.LOOKUP_RESPONSE).setLookupTopicResponse(connectionBroker));
+ ByteBuf res = serializeWithSize(
+ BaseCommand.newBuilder().setType(Type.LOOKUP_RESPONSE).setLookupTopicResponse(connectionBroker));
connectionBuilder.recycle();
connectionBroker.recycle();
return res;
@@ -995,7 +998,8 @@ public class Commands {
}
public static ByteBuf newGetLastMessageIdResponse(long requestId, MessageIdData messageIdData) {
- PulsarApi.CommandGetLastMessageIdResponse.Builder response = PulsarApi.CommandGetLastMessageIdResponse.newBuilder()
+ PulsarApi.CommandGetLastMessageIdResponse.Builder response =
+ PulsarApi.CommandGetLastMessageIdResponse.newBuilder()
.setLastMessageId(messageIdData)
.setRequestId(requestId);
@@ -1325,9 +1329,10 @@ public class Commands {
int payloadSize = payload.readableBytes();
int magicAndChecksumLength = ChecksumType.Crc32c.equals(checksumType) ? (2 + 4 /* magic + checksumLength*/) : 0;
boolean includeChecksum = magicAndChecksumLength > 0;
- int headerContentSize = 4 + cmdSize + magicAndChecksumLength + 4 + msgMetadataSize; // cmdLength + cmdSize + magicLength +
- // checksumSize + msgMetadataLength +
- // msgMetadataSize
+ // cmdLength + cmdSize + magicLength +
+ // checksumSize + msgMetadataLength +
+ // msgMetadataSize
+ int headerContentSize = 4 + cmdSize + magicAndChecksumLength + 4 + msgMetadataSize;
int totalSize = headerContentSize + payloadSize;
int headersSize = 4 + headerContentSize; // totalSize + headerLength
int checksumReaderIndex = -1;
@@ -1581,7 +1586,10 @@ public class Commands {
return ProtocolVersion.values()[ProtocolVersion.values().length - 1].getNumber();
}
- public static enum ChecksumType {
+ /**
+ * Definition of possible checksum types.
+ */
+ public enum ChecksumType {
Crc32c,
None;
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
index 597ae9b..95a4f23 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
@@ -20,17 +20,13 @@ package org.apache.pulsar.common.protocol;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
-
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
-
import lombok.SneakyThrows;
import lombok.experimental.UtilityClass;
-
-import org.apache.pulsar.common.api.proto.PulsarMarkers;
-import org.apache.pulsar.common.protocol.Commands.ChecksumType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.api.proto.PulsarMarkers;
import org.apache.pulsar.common.api.proto.PulsarMarkers.ClusterMessageId;
import org.apache.pulsar.common.api.proto.PulsarMarkers.MarkerType;
import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData;
@@ -38,10 +34,12 @@ import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsS
import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshotRequest;
import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshotResponse;
import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsUpdate;
+import org.apache.pulsar.common.protocol.Commands.ChecksumType;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
@UtilityClass
+@SuppressWarnings("checkstyle:JavadocType")
public class Markers {
private static ByteBuf newMessage(MarkerType markerType, Optional<String> restrictToCluster, ByteBuf payload) {
@@ -213,7 +211,8 @@ public class Markers {
}
@SneakyThrows
- public static ByteBuf newReplicatedSubscriptionsUpdate(String subscriptionName, Map<String, MessageIdData> clusterIds) {
+ public static ByteBuf newReplicatedSubscriptionsUpdate(String subscriptionName,
+ Map<String, MessageIdData> clusterIds) {
ReplicatedSubscriptionsUpdate.Builder builder = ReplicatedSubscriptionsUpdate.newBuilder();
builder.setSubscriptionName(subscriptionName);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 904cd95..31c40c4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -76,6 +76,9 @@ import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Basic implementation of the channel handler to process inbound Pulsar data.
+ */
public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {
@Override
@@ -513,11 +516,11 @@ public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {
}
protected void handleConsumerStats(CommandConsumerStats commandConsumerStats) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException();
}
protected void handleConsumerStatsResponse(CommandConsumerStatsResponse commandConsumerStatsResponse) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException();
}
protected void handleReachedEndOfTopic(CommandReachedEndOfTopic commandReachedEndOfTopic) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
index c83bc72..7a8d5c0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
@@ -18,18 +18,19 @@
*/
package org.apache.pulsar.common.protocol;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.ScheduledFuture;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
-
import org.apache.pulsar.common.api.proto.PulsarApi.CommandPing;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandPong;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.util.concurrent.ScheduledFuture;
-
+/**
+ * Implementation of the channel handler to process inbound Pulsar data.
+ */
public abstract class PulsarHandler extends PulsarDecoder {
protected ChannelHandlerContext ctx;
protected SocketAddress remoteAddress;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/package-info.java
similarity index 91%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/protocol/package-info.java
index 1151443..cf8d966 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Common protocol.
+ */
+package org.apache.pulsar.common.protocol;
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/BytesSchemaVersion.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/BytesSchemaVersion.java
index 4b3ba78..fec2e1a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/BytesSchemaVersion.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/BytesSchemaVersion.java
@@ -23,7 +23,7 @@ import java.util.Arrays;
import java.util.Comparator;
/**
- * Bytes schema version
+ * Bytes schema version.
*/
public class BytesSchemaVersion implements SchemaVersion, Comparable<BytesSchemaVersion> {
@@ -106,7 +106,7 @@ public class BytesSchemaVersion implements SchemaVersion, Comparable<BytesSchema
* characters are hex escaped in the format \\x%02X, eg:
* \x00 \x05 etc.
*
- * This function is brought from org.apache.hadoop.hbase.util.Bytes
+ * <p>This function is brought from org.apache.hadoop.hbase.util.Bytes
*
* @param b array to write out
* @param off offset to start at
@@ -116,15 +116,18 @@ public class BytesSchemaVersion implements SchemaVersion, Comparable<BytesSchema
private static String toString(final byte[] b, int off, int len) {
StringBuilder result = new StringBuilder();
- if (b == null)
+ if (b == null) {
return result.toString();
+ }
// just in case we are passed a 'len' that is > buffer length...
- if (off >= b.length)
+ if (off >= b.length) {
return result.toString();
+ }
- if (off + len > b.length)
+ if (off + len > b.length) {
len = b.length - off;
+ }
for (int i = off; i < off + len; ++i) {
int ch = b[i] & 0xFF;
@@ -144,6 +147,9 @@ public class BytesSchemaVersion implements SchemaVersion, Comparable<BytesSchema
*/
public final static ByteArrayComparator BYTES_LEXICO_COMPARATOR = new LexicographicByteArrayComparator();
+ /**
+ * This interface helps to compare byte arrays.
+ */
public interface ByteArrayComparator extends Comparator<byte[]>, Serializable {
int compare(final byte[] buffer1, int offset1, int length1,
@@ -163,9 +169,9 @@ public class BytesSchemaVersion implements SchemaVersion, Comparable<BytesSchema
final byte[] buffer2, int offset2, int length2) {
// short circuit equal case
- if (buffer1 == buffer2 &&
- offset1 == offset2 &&
- length1 == length2) {
+ if (buffer1 == buffer2
+ && offset1 == offset2
+ && length1 == length2) {
return 0;
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/DeleteSchemaResponse.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/DeleteSchemaResponse.java
index d663fb7..c74504d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/DeleteSchemaResponse.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/DeleteSchemaResponse.java
@@ -23,6 +23,9 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
+/**
+ * Response for schema deletion.
+ */
@Data
@Builder
@AllArgsConstructor
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/EmptyVersion.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/EmptyVersion.java
index 6696d4c..e8713ee 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/EmptyVersion.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/EmptyVersion.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.common.protocol.schema;
+/**
+ * Empty schema version.
+ */
public final class EmptyVersion implements SchemaVersion {
private static final byte[] EMPTY = new byte[]{};
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/GetAllVersionsSchemaResponse.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/GetAllVersionsSchemaResponse.java
index e65d50f..462ae54 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/GetAllVersionsSchemaResponse.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/GetAllVersionsSchemaResponse.java
@@ -18,13 +18,15 @@
*/
package org.apache.pulsar.common.protocol.schema;
+import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
-import java.util.List;
-
+/**
+ * Response containing all schemas.
+ */
@Data
@Builder
@AllArgsConstructor
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/GetSchemaResponse.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/GetSchemaResponse.java
index 9f1bdb7..ea14f79 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/GetSchemaResponse.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/GetSchemaResponse.java
@@ -25,6 +25,9 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.pulsar.common.schema.SchemaType;
+/**
+ * Response containing information about a schema.
+ */
@Data
@Builder
@AllArgsConstructor
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/IsCompatibilityResponse.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/IsCompatibilityResponse.java
index 26ae2ef..0361e16 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/IsCompatibilityResponse.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/IsCompatibilityResponse.java
@@ -23,6 +23,9 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
+/**
+ * Response defining if a schema is compatible with existing topic's schema.
+ */
@Data
@Builder
@AllArgsConstructor
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/LatestVersion.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/LatestVersion.java
index 781b57f..c07b187 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/LatestVersion.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/LatestVersion.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.common.protocol.schema;
+/**
+ * Latest schema version.
+ */
public final class LatestVersion implements SchemaVersion {
private static final byte[] EMPTY = new byte[]{};
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/LongSchemaVersionResponse.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/LongSchemaVersionResponse.java
index be55083..95b5bdd 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/LongSchemaVersionResponse.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/LongSchemaVersionResponse.java
@@ -23,6 +23,9 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
+/**
+ * Response containing the schema version field.
+ */
@Data
@Builder
@AllArgsConstructor
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/PostSchemaPayload.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/PostSchemaPayload.java
index 89668f0..cac4149 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/PostSchemaPayload.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/PostSchemaPayload.java
@@ -23,6 +23,9 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
+/**
+ * Payload with information about a schema.
+ */
@Data
@AllArgsConstructor
@NoArgsConstructor
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/PostSchemaResponse.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/PostSchemaResponse.java
index d9ae768..d206fdf 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/PostSchemaResponse.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/PostSchemaResponse.java
@@ -23,6 +23,9 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
+/**
+ * Response with the version of a schema.
+ */
@Data
@Builder
@AllArgsConstructor
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java
index d534811..744c88b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java
@@ -26,6 +26,9 @@ import lombok.ToString;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
+/**
+ * Schema data.
+ */
@Builder
@Data
@ToString
@@ -53,7 +56,7 @@ public class SchemaData {
}
/**
- * Convert a schema info to a schema data
+ * Convert a schema info to a schema data.
*
* @param schemaInfo schema info
* @return the converted schema schema data
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaInfoUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaInfoUtil.java
index d53b311..c3e7fee 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaInfoUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaInfoUtil.java
@@ -20,14 +20,15 @@ package org.apache.pulsar.common.protocol.schema;
import java.util.Collections;
import java.util.TreeMap;
-
import lombok.experimental.UtilityClass;
-
-import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.Schema;
+import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.SchemaInfo;
+/**
+ * Class helping to initialize schemas.
+ */
@UtilityClass
public class SchemaInfoUtil {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaVersion.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaVersion.java
index b77e3ec..653b4da 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaVersion.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaVersion.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.common.protocol.schema;
+/**
+ * Schema version.
+ */
public interface SchemaVersion {
SchemaVersion Latest = new LatestVersion();
SchemaVersion Empty = new EmptyVersion();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/package-info.java
similarity index 89%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/package-info.java
index 1151443..a38c45f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Classes to work with schemas.
+ */
+package org.apache.pulsar.common.protocol.schema;
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/JAASCredentialsContainer.java b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/JAASCredentialsContainer.java
index 345bae8..346dda1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/JAASCredentialsContainer.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/JAASCredentialsContainer.java
@@ -61,14 +61,14 @@ public class JAASCredentialsContainer implements Closeable {
AppConfigurationEntry[] entries = Configuration.getConfiguration()
.getAppConfigurationEntry(loginContextName);
if (entries == null) {
- final String errorMessage = "loginContext name (JAAS file section header) was null. " +
- "Please check your java.security.login.auth.config (=" +
- System.getProperty("java.security.login.auth.config") +
- ") for section header: " + this.loginContextName;
+ final String errorMessage = "loginContext name (JAAS file section header) was null. "
+ + "Please check your java.security.login.auth.config (="
+ + System.getProperty("java.security.login.auth.config")
+ + ") for section header: " + this.loginContextName;
log.error("No JAAS Configuration section header found for Client: {}", errorMessage);
throw new LoginException(errorMessage);
}
- LoginContext loginContext = new LoginContext(loginContextName,callbackHandler);
+ LoginContext loginContext = new LoginContext(loginContextName, callbackHandler);
loginContext.login();
log.info("successfully logged in.");
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/KerberosName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/KerberosName.java
index 5f36fde..6771a23 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/KerberosName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/KerberosName.java
@@ -19,10 +19,9 @@
package org.apache.pulsar.common.sasl;
+import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
@@ -33,10 +32,10 @@ import java.util.regex.Pattern;
* particular, it splits them apart and translates them down into local
* operating system names.
*
- * Copied from Apache ZooKeeper KerberosName.
+ * <p>Copied from Apache ZooKeeper KerberosName.
*/
public class KerberosName {
- /** The first component of the name */
+ /** The first component of the name. */
private final String serviceName;
/** The second component of the name. It may be null. */
private final String hostName;
@@ -60,8 +59,8 @@ public class KerberosName {
* A pattern for parsing a auth_to_local rule.
*/
private static final Pattern ruleParser =
- Pattern.compile("\\s*((DEFAULT)|(RULE:\\[(\\d*):([^\\]]*)](\\(([^)]*)\\))?"+
- "(s/([^/]*)/([^/]*)/(g)?)?))");
+ Pattern.compile("\\s*((DEFAULT)|(RULE:\\[(\\d*):([^\\]]*)](\\(([^)]*)\\))?"
+ + "(s/([^/]*)/([^/]*)/(g)?)?))");
/**
* A pattern that recognizes simple/non-simple names.
@@ -92,19 +91,19 @@ public class KerberosName {
kerbConf = getInstanceMethod.invoke(classRef, new Object[0]);
getDefaultRealmMethod = classRef.getDeclaredMethod("getDefaultRealm",
new Class<?>[0]);
- return (String)getDefaultRealmMethod.invoke(kerbConf, new Object[0]);
+ return (String) getDefaultRealmMethod.invoke(kerbConf, new Object[0]);
}
static {
try {
defaultRealm = getDefaultRealm2();
} catch (Exception ke) {
- if ((System.getProperty("zookeeper.requireKerberosConfig") != null) &&
- (System.getProperty("zookeeper.requireKerberosConfig").equals("true"))) {
- throw new IllegalArgumentException("Can't get Kerberos configuration",ke);
+ if ((System.getProperty("zookeeper.requireKerberosConfig") != null)
+ && (System.getProperty("zookeeper.requireKerberosConfig").equals("true"))) {
+ throw new IllegalArgumentException("Can't get Kerberos configuration", ke);
+ } else {
+ defaultRealm = "";
}
- else
- defaultRealm="";
}
try {
// setConfiguration() will work even if the above try() fails due
@@ -112,8 +111,7 @@ public class KerberosName {
// is set to true, which would not allow execution to reach here due to the
// throwing of an IllegalArgumentException above).
setConfiguration();
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new IllegalArgumentException("Could not configure Kerberos principal name mapping.");
}
}
@@ -274,14 +272,13 @@ public class KerberosName {
try {
int num = Integer.parseInt(paramNum);
if (num < 0 || num > params.length) {
- throw new BadFormatString("index " + num + " from " + format +
- " is outside of the valid range 0 to " +
- (params.length - 1));
+ throw new BadFormatString("index " + num + " from " + format
+ + " is outside of the valid range 0 to " + (params.length - 1));
}
result.append(params[num]);
} catch (NumberFormatException nfe) {
- throw new BadFormatString("bad format in username mapping in " +
- paramNum, nfe);
+ throw new BadFormatString("bad format in username mapping in "
+ + paramNum, nfe);
}
}
@@ -334,8 +331,8 @@ public class KerberosName {
}
}
if (result != null && nonSimplePattern.matcher(result).find()) {
- throw new NoMatchingRule("Non-simple name " + result +
- " after auth_to_local rule " + this);
+ throw new NoMatchingRule("Non-simple name " + result
+ + " after auth_to_local rule " + this);
}
return result;
}
@@ -366,7 +363,7 @@ public class KerberosName {
/**
* Set the static configuration to get the rules.
- * @param conf the new configuration
+ *
* @throws IOException
*/
public static void setConfiguration() throws IOException {
@@ -374,7 +371,7 @@ public class KerberosName {
rules = parseRules(ruleString);
}
- @SuppressWarnings("serial")
+ @SuppressWarnings({"serial", "checkstyle:JavadocType"})
public static class BadFormatString extends IOException {
BadFormatString(String msg) {
super(msg);
@@ -384,7 +381,7 @@ public class KerberosName {
}
}
- @SuppressWarnings("serial")
+ @SuppressWarnings({"serial", "checkstyle:JavadocType"})
public static class NoMatchingRule extends IOException {
NoMatchingRule(String msg) {
super(msg);
@@ -408,7 +405,7 @@ public class KerberosName {
} else {
params = new String[]{realm, serviceName, hostName};
}
- for(Rule r: rules) {
+ for (Rule r: rules) {
String result = r.apply(params);
if (result != null) {
return result;
@@ -419,13 +416,13 @@ public class KerberosName {
static void printRules() throws IOException {
int i = 0;
- for(Rule r: rules) {
+ for (Rule r: rules) {
System.out.println(++i + " " + r);
}
}
public static void main(String[] args) throws Exception {
- for(String arg: args) {
+ for (String arg: args) {
KerberosName name = new KerberosName(arg);
System.out.println("Name: " + name + " to " + name.getShortName());
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java
index 424b02b..18a6238 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java
@@ -145,8 +145,9 @@ public class TGTRefreshThread extends Thread {
nextRefreshDate = new Date(nextRefresh);
if (nextRefresh > expiry) {
Object[] logPayload = {nextRefreshDate, expiryDate};
- log.error("next refresh: {} is later than expiry {}." + " This may indicate a clock skew problem."
- + "Check that this host and the KDC's " + "hosts' clocks are in sync. Exiting refresh thread.",
+ log.error(
+ "next refresh: {} is later than expiry {}." + " This may indicate a clock skew problem."
+ + "Check that this host and the KDC's " + "hosts' clocks are in sync. Exiting refresh thread.",
logPayload);
return;
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/package-info.java
similarity index 88%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/sasl/package-info.java
index 1151443..bd9f0ae 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Implementation of Simple Authentication and Security Layer.
+ */
+package org.apache.pulsar.common.sasl;
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/AllocatorStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/AllocatorStats.java
index 87787dd..a07704c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/AllocatorStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/AllocatorStats.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.common.stats;
import java.util.List;
+/**
+ * Allocator statistics.
+ */
public class AllocatorStats {
public int numDirectArenas;
public int numHeapArenas;
@@ -31,6 +34,9 @@ public class AllocatorStats {
public List<PoolArenaStats> directArenas;
public List<PoolArenaStats> heapArenas;
+ /**
+ * Pool arena statistics.
+ */
public static class PoolArenaStats {
public int numTinySubpages;
public int numSmallSubpages;
@@ -56,6 +62,9 @@ public class AllocatorStats {
public long numActiveHugeAllocations;
}
+ /**
+ * Pool subpage statistics.
+ */
public static class PoolSubpageStats {
public int maxNumElements;
public int numAvailable;
@@ -63,12 +72,18 @@ public class AllocatorStats {
public int pageSize;
}
+ /**
+ * Pool chunk list statistics.
+ */
public static class PoolChunkListStats {
public int minUsage;
public int maxUsage;
public List<PoolChunkStats> chunks;
}
+ /**
+ * Pool chunk statistics.
+ */
public static class PoolChunkStats {
public int usage;
public int chunkSize;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmDefaultGCMetricsLogger.java b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmDefaultGCMetricsLogger.java
index 06dddcb..5eedec9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmDefaultGCMetricsLogger.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmDefaultGCMetricsLogger.java
@@ -18,17 +18,15 @@
*/
package org.apache.pulsar.common.stats;
+import com.google.common.collect.Maps;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.Map;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Maps;
-
-@SuppressWarnings("restriction")
+@SuppressWarnings({"restriction", "checkstyle:JavadocType"})
public class JvmDefaultGCMetricsLogger implements JvmGCMetricsLogger {
private static final Logger log = LoggerFactory.getLogger(JvmDefaultGCMetricsLogger.class);
@@ -51,6 +49,9 @@ public class JvmDefaultGCMetricsLogger implements JvmGCMetricsLogger {
}
}
+ /**
+ * Metrics for the Garbage Collector.
+ */
static class GCMetrics {
volatile long accumulatedGcCount = 0;
volatile long currentGcCount = 0;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmG1GCMetricsLogger.java b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmG1GCMetricsLogger.java
index 56a820b..041727a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmG1GCMetricsLogger.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmG1GCMetricsLogger.java
@@ -27,6 +27,9 @@ import javax.management.ObjectName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Logger for the JVM G1 GC metrics.
+ */
public class JvmG1GCMetricsLogger implements JvmGCMetricsLogger {
private volatile long accumulatedYoungGcCount = 0;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmGCMetricsLogger.java b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmGCMetricsLogger.java
index 50ee176..16985f1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmGCMetricsLogger.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmGCMetricsLogger.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.common.stats;
/**
- *
+ *
* {@link JvmGCMetricsLogger} can be implemented for each specific GC type which retrieves GC count and pause time and
* logs it into metrics.
*
@@ -28,13 +28,13 @@ public interface JvmGCMetricsLogger {
/**
* {@link JvmGCMetricsLogger} should update the metrics with GC specific dimensions and value.
- *
+ *
* @param metrics
*/
void logMetrics(Metrics metrics);
/**
- * It will be triggered by {@link JvmMetrics} periodically to refresh stats at interval (default = 1 min)
+ * It will be triggered by {@link JvmMetrics} periodically to refresh stats at interval (default = 1 min).
*/
void refresh();
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
index 177190a..93d2eec 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
@@ -20,13 +20,10 @@ package org.apache.pulsar.common.stats;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-
import io.netty.buffer.PoolArenaMetric;
import io.netty.buffer.PoolChunkListMetric;
import io.netty.buffer.PoolChunkMetric;
import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.util.internal.PlatformDependent;
-
import java.lang.management.BufferPoolMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
@@ -40,11 +37,13 @@ import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * This class is responsible for providing JVM metrics.
+ */
public class JvmMetrics {
private static final Logger log = LoggerFactory.getLogger(JvmMetrics.class);
@@ -55,7 +54,8 @@ public class JvmMetrics {
private final static Map<String, Class<? extends JvmGCMetricsLogger>> gcLoggerMap = new HashMap<>();
static {
try {
- directMemoryUsage = PlatformDependent.class.getDeclaredField("DIRECT_MEMORY_COUNTER");
+ directMemoryUsage = io.netty.util.internal.PlatformDependent.class
+ .getDeclaredField("DIRECT_MEMORY_COUNTER");
directMemoryUsage.setAccessible(true);
} catch (Exception e) {
log.warn("Failed to access netty DIRECT_MEMORY_COUNTER field {}", e.getMessage());
@@ -111,7 +111,7 @@ public class JvmMetrics {
m.put("jvm_total_memory", r.totalMemory());
m.put("jvm_direct_memory_used", getJvmDirectMemoryUsed());
- m.put("jvm_max_direct_memory", PlatformDependent.maxDirectMemory());
+ m.put("jvm_max_direct_memory", io.netty.util.internal.PlatformDependent.maxDirectMemory());
m.put("jvm_thread_cnt", getThreadCount());
this.gcLogger.logMetrics(m);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Metrics.java b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Metrics.java
index fe52ee8..88111e3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Metrics.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/Metrics.java
@@ -18,28 +18,25 @@
*/
package org.apache.pulsar.common.stats;
-import java.util.Collections;
-import java.util.Map;
-
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.google.common.base.Objects;
import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.Map;
/**
* WARNING : do not add any getters as the Jackson parser will output that getter.
*
- * You may want to use the ignore annotation provided by jackson parser if you need some getters.
- *
- * Dimensions map should be unmodifiable and immutable
- *
+ * <p>You may want to use the ignore annotation provided by jackson parser if you need some getters.
*
+ * <p>Dimensions map should be unmodifiable and immutable
*/
public class Metrics {
final Map<String, Object> metrics;
- @JsonInclude(content=Include.NON_EMPTY)
+ @JsonInclude(content = Include.NON_EMPTY)
final Map<String, String> dimensions;
public Metrics() {
@@ -54,11 +51,9 @@ public class Metrics {
}
/**
- * Creates a metrics object with the dimensions map immutable
+ * Creates a metrics object with the dimensions map immutable.
*
- * @param application
- * @param timestamp
- * @param dimensionsMap
+ * @param dimensionMap
* @return
*/
public static Metrics create(Map<String, String> dimensionMap) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/package-info.java
similarity index 88%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/stats/package-info.java
index 1151443..f7d5818 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Definition of classes responsible for statistics.
+ */
+package org.apache.pulsar.common.stats;
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClientSslContextRefresher.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClientSslContextRefresher.java
index c9b8e5d..48ac937 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClientSslContextRefresher.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClientSslContextRefresher.java
@@ -18,16 +18,15 @@
*/
package org.apache.pulsar.common.util;
+import io.netty.handler.ssl.SslContext;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.cert.X509Certificate;
-
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.handler.ssl.SslContext;
-
+@SuppressWarnings("checkstyle:JavadocType")
public class ClientSslContextRefresher {
private volatile SslContext sslContext;
private boolean tlsAllowInsecureConnection;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/DateFormatter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/DateFormatter.java
index d4c6e1a..4d0dd94 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/DateFormatter.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/DateFormatter.java
@@ -24,7 +24,7 @@ import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
/**
- * Date-time String formatter utility class
+ * Date-time String formatter utility class.
*/
public class DateFormatter {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/DefaultSslContextBuilder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/DefaultSslContextBuilder.java
index 4716254..3e888f4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/DefaultSslContextBuilder.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/DefaultSslContextBuilder.java
@@ -25,6 +25,7 @@ import java.security.GeneralSecurityException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLException;
+@SuppressWarnings("checkstyle:JavadocType")
public class DefaultSslContextBuilder extends SslContextAutoRefreshBuilder<SSLContext> {
private volatile SSLContext sslContext;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
index b8660b5..f70e812 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
@@ -18,10 +18,13 @@
*/
package org.apache.pulsar.common.util;
-import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;
+import com.fasterxml.jackson.databind.util.EnumResolver;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
@@ -33,20 +36,12 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
-
import org.apache.commons.lang3.StringUtils;
-import com.fasterxml.jackson.databind.util.EnumResolver;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import io.netty.util.internal.StringUtil;
-
/**
- *
* Generic value converter.
- * <p>
- * <h3>Use examples</h3>
+ *
+ * <p><h3>Use examples</h3>
*
* <pre>
* String o1 = String.valueOf(1);
@@ -170,14 +165,15 @@ public final class FieldParser {
Type fieldType = field.getGenericType();
if (fieldType instanceof ParameterizedType) {
Class<?> clazz = (Class<?>) ((ParameterizedType) field.getGenericType()).getActualTypeArguments()[0];
- // convert to list
if (field.getType().equals(List.class)) {
+ // convert to list
return stringToList(strValue, clazz);
- } // convert to set
- else if (field.getType().equals(Set.class)) {
+ } else if (field.getType().equals(Set.class)) {
+ // covert to set
return stringToSet(strValue, clazz);
} else if (field.getType().equals(Map.class)) {
- Class<?> valueClass = (Class<?>) ((ParameterizedType) field.getGenericType()).getActualTypeArguments()[1];
+ Class<?> valueClass =
+ (Class<?>) ((ParameterizedType) field.getGenericType()).getActualTypeArguments()[1];
return stringToMap(strValue, clazz, valueClass);
} else if (field.getType().equals(Optional.class)) {
Type typeClazz = ((ParameterizedType) fieldType).getActualTypeArguments()[0];
@@ -196,8 +192,8 @@ public final class FieldParser {
}
/**
- * Sets the empty/null value if field is allowed to be set empty
- *
+ * Sets the empty/null value if field is allowed to be set empty.
+ *
* @param strValue
* @param field
* @param obj
@@ -254,13 +250,13 @@ public final class FieldParser {
/**
* Converts String to Integer.
*
- * @param value
+ * @param val
* The String to be converted.
* @return The converted Integer value.
*/
public static Integer stringToInteger(String val) {
String v = trim(val);
- if (StringUtil.isNullOrEmpty(v)) {
+ if (io.netty.util.internal.StringUtil.isNullOrEmpty(v)) {
return null;
} else {
return Integer.valueOf(v);
@@ -270,7 +266,7 @@ public final class FieldParser {
/**
* Converts String to Long.
*
- * @param value
+ * @param val
* The String to be converted.
* @return The converted Long value.
*/
@@ -281,13 +277,13 @@ public final class FieldParser {
/**
* Converts String to Double.
*
- * @param value
+ * @param val
* The String to be converted.
* @return The converted Double value.
*/
public static Double stringToDouble(String val) {
String v = trim(val);
- if (StringUtil.isNullOrEmpty(v)) {
+ if (io.netty.util.internal.StringUtil.isNullOrEmpty(v)) {
return null;
} else {
return Double.valueOf(v);
@@ -297,7 +293,7 @@ public final class FieldParser {
/**
* Converts String to float.
*
- * @param value
+ * @param val
* The String to be converted.
* @return The converted Double value.
*/
@@ -306,13 +302,13 @@ public final class FieldParser {
}
/**
- * Converts comma separated string to List
+ * Converts comma separated string to List.
*
* @param <T>
* type of list
- * @param value
+ * @param val
* comma separated values.
- * @return The converted list with type <T>.
+ * @return The converted list with type {@code <T>}.
*/
public static <T> List<T> stringToList(String val, Class<T> type) {
String[] tokens = trim(val).split(",");
@@ -322,13 +318,13 @@ public final class FieldParser {
}
/**
- * Converts comma separated string to Set
+ * Converts comma separated string to Set.
*
* @param <T>
* type of set
- * @param value
+ * @param val
* comma separated values.
- * @return The converted set with type <T>.
+ * @return The converted set with type {@code <T>}.
*/
public static <T> Set<T> stringToSet(String val, Class<T> type) {
String[] tokens = trim(val).split(",");
@@ -348,7 +344,7 @@ public final class FieldParser {
}
return map;
}
-
+
private static String trim(String val) {
checkNotNull(val);
return val.trim();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java
index 44424a3..d269624 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java
@@ -23,12 +23,13 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileTime;
-
+import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import lombok.Getter;
-
+/**
+ * Class working with file's modified time.
+ */
public class FileModifiedTimeUpdater {
@Getter
String fileName;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index e83167a..b86ee10 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -22,10 +22,13 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+/**
+ * This class is aimed at simplifying work with {@code CompletableFuture}.
+ */
public class FutureUtil {
/**
- * Return a future that represents the completion of the futures in the provided list
+ * Return a future that represents the completion of the futures in the provided list.
*
* @param futures
* @return
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Hash.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Hash.java
index ac7d0b0..58fb27c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Hash.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Hash.java
@@ -18,12 +18,15 @@
*/
package org.apache.pulsar.common.util;
+/**
+ * This interface declares a hash function.
+ */
public interface Hash {
/**
- * Generate the hash of a given byte array
+ * Generate the hash of a given byte array.
*
- * @return The hash of {@param b}, which is non-negative integer.
+ * @return The hash of {@code b}, which is non-negative integer.
*/
int makeHash(byte[] b);
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyStoreHolder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyStoreHolder.java
index 809b3c8..6284ec4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyStoreHolder.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyStoreHolder.java
@@ -25,6 +25,11 @@ import java.security.KeyStoreException;
import java.security.PrivateKey;
import java.security.cert.Certificate;
+/**
+ * Holder for the secure key store.
+ *
+ * @see java.security.KeyStore
+ */
public class KeyStoreHolder {
private KeyStore keyStore = null;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Murmur3_32Hash.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Murmur3_32Hash.java
index dd0c5ab..e9510b3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Murmur3_32Hash.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Murmur3_32Hash.java
@@ -23,11 +23,14 @@
*/
package org.apache.pulsar.common.util;
+import com.google.common.primitives.UnsignedBytes;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import com.google.common.primitives.UnsignedBytes;
-
+/**
+ * Implementation of the MurmurHash3 non-cryptographic hash function.
+ */
+@SuppressWarnings("checkstyle:TypeName")
public class Murmur3_32Hash implements Hash {
private static final Murmur3_32Hash instance = new Murmur3_32Hash();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/NamespaceBundleStatsComparator.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NamespaceBundleStatsComparator.java
index bbab28e..34c5b62 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/NamespaceBundleStatsComparator.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NamespaceBundleStatsComparator.java
@@ -22,7 +22,6 @@ import java.util.Comparator;
import java.util.Map;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
-import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage.ResourceType;
/**
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettySslContextBuilder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettySslContextBuilder.java
index e1ee4ab..713c52d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettySslContextBuilder.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettySslContextBuilder.java
@@ -18,15 +18,16 @@
*/
package org.apache.pulsar.common.util;
+import io.netty.handler.ssl.SslContext;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.Set;
-
import javax.net.ssl.SSLException;
-import io.netty.handler.ssl.SslContext;
-
+/**
+ * SSL context builder for Netty.
+ */
public class NettySslContextBuilder extends SslContextAutoRefreshBuilder<SslContext> {
private volatile SslContext sslNettyContext;
@@ -38,7 +39,8 @@ public class NettySslContextBuilder extends SslContextAutoRefreshBuilder<SslCont
}
@Override
- public synchronized SslContext update() throws SSLException, FileNotFoundException, GeneralSecurityException, IOException {
+ public synchronized SslContext update()
+ throws SSLException, FileNotFoundException, GeneralSecurityException, IOException {
this.sslNettyContext = SecurityUtility.createNettySslContextForServer(tlsAllowInsecureConnection,
tlsTrustCertsFilePath.getFileName(), tlsCertificateFilePath.getFileName(), tlsKeyFilePath.getFileName(),
tlsCiphers, tlsProtocols, tlsRequireTrustedClientCertOnConnect);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/NumberFormat.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NumberFormat.java
index eaf3664..502af82 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/NumberFormat.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NumberFormat.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.common.util;
import io.netty.buffer.ByteBuf;
+/**
+ * Custom number formatter for {@code io.netty.buffer.ByteBuf}.
+ */
public class NumberFormat {
static void format(ByteBuf out, long num) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
index 650c3ca..56dfa7e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import io.netty.util.concurrent.FastThreadLocal;
+@SuppressWarnings("checkstyle:JavadocType")
public class ObjectMapperFactory {
public static ObjectMapper create() {
ObjectMapper mapper = new ObjectMapper();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
index e2aacdd..006cddd 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
@@ -20,38 +20,32 @@ package org.apache.pulsar.common.util;
import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.base.MoreObjects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import com.google.common.base.MoreObjects;
-
/**
- *
* A Rate Limiter that distributes permits at a configurable rate. Each {@link #acquire()} blocks if necessary until a
* permit is available, and then takes it. Each {@link #tryAcquire()} tries to acquire permits from available permits,
* it returns true if it succeed else returns false. Rate limiter release configured permits at every configured rate
* time, so, on next ticket new fresh permits will be available.
- * <p>
- * For example: if RateLimiter is configured to release 10 permits at every 1 second then RateLimiter will allow to
+ *
+ * <p>For example: if RateLimiter is configured to release 10 permits at every 1 second then RateLimiter will allow to
* acquire 10 permits at any time with in that 1 second.
- * <p>
- * <b>comparison with other RateLimiter such as {@link com.google.common.util.concurrent.RateLimiter}</b>
- * </p>
+ *
+ * <p>Comparison with other RateLimiter such as {@link com.google.common.util.concurrent.RateLimiter}
* <ul>
- * <li><b>Per second rate-limiting:</b> Per second rate-limiting not satisfied by Guava-RateLimiter
- * <p>
- * <b>Guava RateLimiter:</b> For X permits: it releases X/1000 permits every msec. therefore, for permits=2/sec => it
- * release 1st permit on first 500msec and 2nd permit on next 500ms. therefore, if 2 request comes with in 500msec
- * duration then 2nd request fails to acquire permit though we have configured 2 permits/second.
- * <p>
- * <b>RateLimiter:</b> it releases X permits every second. so, in above usecase: if 2 requests comes at the same time
- * then both will acquire the permit.
+ * <li><b>Per second rate-limiting:</b> Per second rate-limiting not satisfied by Guava-RateLimiter</li>
+ * <li><b>Guava RateLimiter:</b> For X permits: it releases X/1000 permits every msec. therefore,
+ * for permits=2/sec => it release 1st permit on first 500msec and 2nd permit on next 500ms. therefore,
+ * if 2 request comes with in 500msec duration then 2nd request fails to acquire permit
+ * though we have configured 2 permits/second.</li>
+ * <li><b>RateLimiter:</b> it releases X permits every second. so, in above usecase:
+ * if 2 requests comes at the same time then both will acquire the permit.</li>
* <li><b>Faster: </b>RateLimiter is light-weight and faster than Guava-RateLimiter</li>
* </ul>
- *
- *
*/
public class RateLimiter implements AutoCloseable{
@@ -110,10 +104,7 @@ public class RateLimiter implements AutoCloseable{
/**
* Acquires the given number of permits from this {@code RateLimiter}, blocking until the request be granted.
*
- * This method is equivalent to {@code acquire(1)}.
- *
- * @param permits
- * the number of permits to acquire
+ * <p>This method is equivalent to {@code acquire(1)}.
*/
public synchronized void acquire() throws InterruptedException {
acquire(1);
@@ -122,7 +113,7 @@ public class RateLimiter implements AutoCloseable{
/**
* Acquires the given number of permits from this {@code RateLimiter}, blocking until the request be granted.
*
- * @param permits
+ * @param acquirePermit
* the number of permits to acquire
*/
public synchronized void acquire(long acquirePermit) throws InterruptedException {
@@ -149,11 +140,8 @@ public class RateLimiter implements AutoCloseable{
/**
* Acquires permits from this {@link RateLimiter} if it can be acquired immediately without delay.
*
- * <p>
- * This method is equivalent to {@code tryAcquire(1)}.
+ * <p>This method is equivalent to {@code tryAcquire(1)}.
*
- * @param permits
- * the number of permits to acquire
* @return {@code true} if the permits were acquired, {@code false} otherwise
*/
public synchronized boolean tryAcquire() {
@@ -163,7 +151,7 @@ public class RateLimiter implements AutoCloseable{
/**
* Acquires permits from this {@link RateLimiter} if it can be acquired immediately without delay.
*
- * @param permits
+ * @param acquirePermit
* the number of permits to acquire
* @return {@code true} if the permits were acquired, {@code false} otherwise
*/
@@ -187,7 +175,7 @@ public class RateLimiter implements AutoCloseable{
}
/**
- * Return available permits for this {@link RateLimiter}
+ * Return available permits for this {@link RateLimiter}.
*
* @return returns 0 if permits is not available
*/
@@ -196,7 +184,7 @@ public class RateLimiter implements AutoCloseable{
}
/**
- * Resets new rate by configuring new value for permits per configured rate-period
+ * Resets new rate by configuring new value for permits per configured rate-period.
*
* @param permits
*/
@@ -212,7 +200,7 @@ public class RateLimiter implements AutoCloseable{
* @param timeUnit
*/
public synchronized void setRate(long permits, long rateTime, TimeUnit timeUnit) {
- if(renewTask != null) {
+ if (renewTask != null) {
renewTask.cancel(false);
}
this.permits = permits;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RelativeTimeUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RelativeTimeUtil.java
index 3034960..71fe412 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RelativeTimeUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RelativeTimeUtil.java
@@ -22,6 +22,9 @@ import java.util.concurrent.TimeUnit;
import lombok.experimental.UtilityClass;
+/**
+ * Parser for relative time.
+ */
@UtilityClass
public class RelativeTimeUtil {
public static long parseRelativeTimeInSeconds(String relativeTime) {
@@ -29,7 +32,7 @@ public class RelativeTimeUtil {
throw new IllegalArgumentException("exipiry time cannot be empty");
}
- int lastIndex= relativeTime.length() - 1;
+ int lastIndex = relativeTime.length() - 1;
char lastChar = relativeTime.charAt(lastIndex);
final char timeUnit;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
index b454116..f77743f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
@@ -18,6 +18,10 @@
*/
package org.apache.pulsar.common.util;
+import io.netty.handler.ssl.ClientAuth;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
@@ -41,28 +45,24 @@ import java.security.spec.PKCS8EncodedKeySpec;
import java.util.Base64;
import java.util.Collection;
import java.util.Set;
-
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLException;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
-
import org.eclipse.jetty.util.ssl.SslContextFactory;
-import io.netty.handler.ssl.ClientAuth;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.SslContextBuilder;
-import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
-
+/**
+ * Helper class for the security domain.
+ */
public class SecurityUtility {
static {
// Fixes loading PKCS8Key file: https://stackoverflow.com/a/18912362
java.security.Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider());
}
-
+
public static SSLContext createSslContext(boolean allowInsecureConnection, Certificate[] trustCertificates)
throws GeneralSecurityException {
return createSslContext(allowInsecureConnection, trustCertificates, (Certificate[]) null, (PrivateKey) null);
@@ -244,7 +244,8 @@ public class SecurityUtility {
}
}
- private static void setupClientAuthentication(SslContextBuilder builder, boolean requireTrustedClientCertOnConnect) {
+ private static void setupClientAuthentication(SslContextBuilder builder,
+ boolean requireTrustedClientCertOnConnect) {
if (requireTrustedClientCertOnConnect) {
builder.clientAuth(ClientAuth.REQUIRE);
} else {
@@ -259,11 +260,11 @@ public class SecurityUtility {
SslContextFactory sslCtxFactory = null;
if (autoRefresh) {
sslCtxFactory = new SslContextFactoryWithAutoRefresh(tlsAllowInsecureConnection, tlsTrustCertsFilePath,
- tlsCertificateFilePath, tlsKeyFilePath, tlsRequireTrustedClientCertOnConnect, 0);
+ tlsCertificateFilePath, tlsKeyFilePath, tlsRequireTrustedClientCertOnConnect, 0);
} else {
sslCtxFactory = new SslContextFactory();
SSLContext sslCtx = createSslContext(tlsAllowInsecureConnection, tlsTrustCertsFilePath,
- tlsCertificateFilePath, tlsKeyFilePath);
+ tlsCertificateFilePath, tlsKeyFilePath);
sslCtxFactory.setSslContext(sslCtx);
}
if (tlsRequireTrustedClientCertOnConnect) {
@@ -274,10 +275,9 @@ public class SecurityUtility {
sslCtxFactory.setTrustAll(true);
return sslCtxFactory;
}
-
+
/**
- * {@link SslContextFactory} that auto-refresh SSLContext
- *
+ * {@link SslContextFactory} that auto-refresh SSLContext.
*/
static class SslContextFactoryWithAutoRefresh extends SslContextFactory {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SslContextAutoRefreshBuilder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SslContextAutoRefreshBuilder.java
index 8d1b472..1847dd7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SslContextAutoRefreshBuilder.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SslContextAutoRefreshBuilder.java
@@ -64,7 +64,7 @@ public abstract class SslContextAutoRefreshBuilder<T> {
/**
* udpates and returns cached SSLContext.
- *
+ *
* @return
* @throws GeneralSecurityException
* @throws IOException
@@ -73,14 +73,14 @@ public abstract class SslContextAutoRefreshBuilder<T> {
/**
* Returns cached SSLContext.
- *
+ *
* @return
*/
protected abstract T getSslContext();
/**
- * It updates SSLContext at every configured refresh time and returns updated SSLContext
- *
+ * It updates SSLContext at every configured refresh time and returns updated SSLContext.
+ *
* @return
*/
public T get() {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
index 739ad1e..8f80d03 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
@@ -21,6 +21,9 @@ package org.apache.pulsar.common.util.collections;
import java.util.BitSet;
import java.util.concurrent.locks.StampedLock;
+/**
+ * Safe multithreaded version of {@code BitSet}.
+ */
public class ConcurrentBitSet extends BitSet {
private static final long serialVersionUID = 1L;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
index 60c24c0..14a6fda 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
@@ -21,17 +21,16 @@ package org.apache.pulsar.common.util.collections;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.StampedLock;
import java.util.function.LongFunction;
-import com.google.common.collect.Lists;
-
/**
* Map from long to an Object.
*
- * Provides similar methods as a ConcurrentMap<long,Object> with 2 differences:
+ * <p>Provides similar methods as a {@code ConcurrentMap<long,Object>} with 2 differences:
* <ol>
* <li>No boxing/unboxing from long -> Long
* <li>Open hash map with linear probing, no node allocations to store the values
@@ -180,7 +179,12 @@ public class ConcurrentLongHashMap<V> {
return values;
}
- public static interface EntryProcessor<V> {
+ /**
+ * Processor for one key-value entry, where the key is {@code long}.
+ *
+ * @param <V> type of the value.
+ */
+ public interface EntryProcessor<V> {
void accept(long key, V value);
}
@@ -472,7 +476,7 @@ public class ConcurrentLongHashMap<V> {
}
}
- private static final long HashMixer = 0xc6a4a7935bd1e995l;
+ private static final long HashMixer = 0xc6a4a7935bd1e995L;
private static final int R = 47;
static final long hash(long key) {
@@ -482,11 +486,11 @@ public class ConcurrentLongHashMap<V> {
return hash;
}
- static final int signSafeMod(long n, int Max) {
- return (int) n & (Max - 1);
+ static final int signSafeMod(long n, int max) {
+ return (int) n & (max - 1);
}
- private static final int alignToPowerOfTwo(int n) {
+ private static int alignToPowerOfTwo(int n) {
return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
index e10e00f..5e2d499 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
@@ -26,20 +26,14 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.StampedLock;
-import java.util.function.BiFunction;
-
-import org.apache.pulsar.common.util.collections.LongPairSet.LongPairFunction;
/**
* Concurrent hash set where values are composed of pairs of longs.
*
- ** <p>
- * (long,long)
- * <p>
- * Provides similar methods as a ConcurrentHashSet<V> but since it's an open hash set with linear probing, no node
- * allocations are required to store the keys and values, and no boxing is required.
- * <p>
- * Values <strong>MUST</strong> be >= 0.
+ * <p>Provides similar methods as a {@code ConcurrentHashSet<V>} but since it's an open hash set with linear probing,
+ * no node allocations are required to store the keys and values, and no boxing is required.
+ *
+ * <p>Values <b>MUST</b> be >= 0.
*/
public class ConcurrentLongPairSet implements LongPairSet {
@@ -53,11 +47,17 @@ public class ConcurrentLongPairSet implements LongPairSet {
private final Section[] sections;
- public static interface ConsumerLong {
+ /**
+ * Represents a function that accepts an object of the {@code LongPair} type.
+ */
+ public interface ConsumerLong {
void accept(LongPair item);
}
- public static interface LongPairConsumer {
+ /**
+ * Represents a function that accepts two long arguments.
+ */
+ public interface LongPairConsumer {
void accept(long v1, long v2);
}
@@ -130,7 +130,7 @@ public class ConcurrentLongPairSet implements LongPairSet {
}
/**
- * Remove an existing entry if found
+ * Remove an existing entry if found.
*
* @param item1
* @return true if removed or false if item was not present
@@ -141,7 +141,7 @@ public class ConcurrentLongPairSet implements LongPairSet {
return getSection(h).remove(item1, item2, (int) h);
}
- private final Section getSection(long hash) {
+ private Section getSection(long hash) {
// Use 32 msb out of long to get the section
final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
return sections[sectionIdx];
@@ -164,7 +164,6 @@ public class ConcurrentLongPairSet implements LongPairSet {
*
* @param filter
* a predicate which returns {@code true} for elements to be removed
- * @return {@code true} if any elements were removed
*
* @return number of removed values
*/
@@ -207,7 +206,7 @@ public class ConcurrentLongPairSet implements LongPairSet {
}
return items;
}
-
+
// A section is a portion of the hash map that is covered by a single
@SuppressWarnings("serial")
private static final class Section extends StampedLock {
@@ -478,7 +477,7 @@ public class ConcurrentLongPairSet implements LongPairSet {
}
}
- private static final long HashMixer = 0xc6a4a7935bd1e995l;
+ private static final long HashMixer = 0xc6a4a7935bd1e995L;
private static final int R = 47;
final static long hash(long key1, long key2) {
@@ -491,20 +490,23 @@ public class ConcurrentLongPairSet implements LongPairSet {
return hash;
}
- static final int signSafeMod(long n, int Max) {
- return (int) (n & (Max - 1)) << 1;
+ static final int signSafeMod(long n, int max) {
+ return (int) (n & (max - 1)) << 1;
}
- private static final int alignToPowerOfTwo(int n) {
+ private static int alignToPowerOfTwo(int n) {
return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
}
- private static final void checkBiggerEqualZero(long n) {
+ private static void checkBiggerEqualZero(long n) {
if (n < 0L) {
throw new IllegalArgumentException("Keys and values must be >= 0");
}
}
+ /**
+ * Class representing two long values.
+ */
public static class LongPair implements Comparable<LongPair> {
public final long first;
public final long second;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
index 94f64de..880ef38 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
@@ -21,19 +21,18 @@ package org.apache.pulsar.common.util.collections;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.StampedLock;
import java.util.function.BiConsumer;
import java.util.function.Function;
-import com.google.common.collect.Lists;
-
/**
- * Concurrent hash map
+ * Concurrent hash map.
*
- * Provides similar methods as a ConcurrentMap<K,V> but since it's an open hash map with linear probing, no node
- * allocations are required to store the values
+ * <p>Provides similar methods as a {@code ConcurrentMap<K,V>} but since it's an open hash map with linear probing,
+ * no node allocations are required to store the values.
*
* @param <V>
*/
@@ -431,7 +430,7 @@ public class ConcurrentOpenHashMap<K, V> {
}
}
- private static final long HashMixer = 0xc6a4a7935bd1e995l;
+ private static final long HashMixer = 0xc6a4a7935bd1e995L;
private static final int R = 47;
final static <K> long hash(K key) {
@@ -441,11 +440,11 @@ public class ConcurrentOpenHashMap<K, V> {
return hash;
}
- static final int signSafeMod(long n, int Max) {
- return (int) (n & (Max - 1)) << 1;
+ static final int signSafeMod(long n, int max) {
+ return (int) (n & (max - 1)) << 1;
}
- private static final int alignToPowerOfTwo(int n) {
+ private static int alignToPowerOfTwo(int n) {
return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
index 2f27913..576637d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.common.util.collections;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -28,13 +29,11 @@ import java.util.concurrent.locks.StampedLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
-import com.google.common.collect.Lists;
-
/**
- * Concurrent hash set
+ * Concurrent hash set.
*
- * Provides similar methods as a ConcurrentMap<K,V> but since it's an open hash map with linear probing, no node
- * allocations are required to store the values
+ * <p>Provides similar methods as a {@code ConcurrentMap<K,V>} but since it's an open hash map with linear probing,
+ * no node allocations are required to store the values.
*
* @param <V>
*/
@@ -359,7 +358,7 @@ public class ConcurrentOpenHashSet<V> {
}
}
- private final void cleanBucket(int bucket) {
+ private void cleanBucket(int bucket) {
int nextInArray = signSafeMod(bucket + 1, capacity);
if (values[nextInArray] == EmptyValue) {
values[bucket] = (V) EmptyValue;
@@ -450,7 +449,7 @@ public class ConcurrentOpenHashSet<V> {
}
}
- private static final long HashMixer = 0xc6a4a7935bd1e995l;
+ private static final long HashMixer = 0xc6a4a7935bd1e995L;
private static final int R = 47;
final static <K> long hash(K key) {
@@ -460,11 +459,11 @@ public class ConcurrentOpenHashSet<V> {
return hash;
}
- static final int signSafeMod(long n, int Max) {
- return (int) n & (Max - 1);
+ static final int signSafeMod(long n, int max) {
+ return (int) n & (max - 1);
}
- private static final int alignToPowerOfTwo(int n) {
+ private static int alignToPowerOfTwo(int n) {
return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
index e7199f9..3af8fb7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.common.util.collections;
import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Range;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
@@ -29,22 +31,16 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.collect.BoundType;
-import com.google.common.collect.Range;
-
/**
* A Concurrent set comprising zero or more ranges of type {@link LongPair}. This can be alternative of
- * {@link com.google.common.collect.RangeSet} and can be used if {@code range} type is {@link LongPair} </br>
- *
+ * {@link com.google.common.collect.RangeSet} and can be used if {@code range} type is {@link LongPair}
+ *
* <pre>
- *
* Usage:
* a. This can be used if one doesn't want to create object for every new inserted {@code range}
- * b. It creates {@link BitSet} for every unique first-key of the range.
+ * b. It creates {@link BitSet} for every unique first-key of the range.
* So, this rangeSet is not suitable for large number of unique keys.
* </pre>
- *
- *
*/
public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements LongPairRangeSet<T> {
@@ -77,8 +73,8 @@ public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements
* Adds the specified range to this {@code RangeSet} (optional operation). That is, for equal range sets a and b,
* the result of {@code a.add(range)} is that {@code a} will be the minimal range set for which both
* {@code a.enclosesAll(b)} and {@code a.encloses(range)}.
- * <p>
- * Note that {@code range} will merge given {@code range} with any ranges in the range set that are
+ *
+ * <p>Note that {@code range} will merge given {@code range} with any ranges in the range set that are
* {@linkplain Range#isConnected(Range) connected} with it. Moreover, if {@code range} is empty, this is a no-op.
*/
@Override
@@ -274,8 +270,7 @@ public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements
* the result of {@code a.add(range)} is that {@code a} will be the minimal range set for which both
* {@code a.enclosesAll(b)} and {@code a.encloses(range)}.
*
- * <p>
- * Note that {@code range} will merge given {@code range} with any ranges in the range set that are
+ * <p>Note that {@code range} will merge given {@code range} with any ranges in the range set that are
* {@linkplain Range#isConnected(Range) connected} with it. Moreover, if {@code range} is empty/invalid, this is a
* no-op.
*/
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
index 12c595c..7df8f0f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
@@ -31,20 +31,19 @@ import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet.LongPairC
/**
* Sorted concurrent {@link LongPairSet} which is not fully accurate in sorting.
- *
+ *
* {@link ConcurrentSortedLongPairSet} creates separate {@link ConcurrentLongPairSet} for unique first-key of
* inserted item. So, it can iterate over all items by sorting on item's first key. However, item's second key will not
* be sorted. eg:
- *
+ *
* <pre>
* insert: (1,2), (1,4), (2,1), (1,5), (2,6)
* while iterating set will first read all the entries for items whose first-key=1 and then first-key=2.
* output: (1,4), (1,5), (1,2), (2,6), (2,1)
* </pre>
- *
- * This map can be expensive and not recommended if set has to store large number of unique item.first's key because set
- * has to create that many {@link ConcurrentLongPairSet} objects
*
+ * <p>This map can be expensive and not recommended if set has to store large number of unique item.first's key
+ * because set has to create that many {@link ConcurrentLongPairSet} objects.
*/
public class ConcurrentSortedLongPairSet implements LongPairSet {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
index ae6a272..f7003aa 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
@@ -32,13 +32,10 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
-import io.netty.util.internal.MathUtil;
-
/**
* This implements a {@link BlockingQueue} backed by an array with no fixed capacity.
*
- * When the capacity is reached, data will be moved to a bigger array.
- *
+ * <p>When the capacity is reached, data will be moved to a bigger array.
*/
public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements BlockingQueue<T> {
@@ -64,7 +61,7 @@ public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements B
headIndex.value = 0;
tailIndex.value = 0;
- int capacity = MathUtil.findNextPositivePowerOfTwo(initialCapacity);
+ int capacity = io.netty.util.internal.MathUtil.findNextPositivePowerOfTwo(initialCapacity);
data = (T[]) new Object[capacity];
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowablePriorityLongPairQueue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowablePriorityLongPairQueue.java
index 79c152a..5854892 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowablePriorityLongPairQueue.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowablePriorityLongPairQueue.java
@@ -24,19 +24,14 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
-import io.netty.util.internal.MathUtil;
-
/**
* An unbounded priority queue based on a min heap where values are composed of pairs of longs.
*
- * When the capacity is reached, data will be moved to a bigger array.
- *
+ * <p>When the capacity is reached, data will be moved to a bigger array.
+ *
* <b>It also act as a set and doesn't store duplicate values if {@link #allowedDuplicate} flag is passed false</b>
- *
- ** <p>
- * (long,long)
- * <p>
*
+ * <p>(long,long)
*/
public class GrowablePriorityLongPairQueue {
@@ -51,16 +46,22 @@ public class GrowablePriorityLongPairQueue {
public GrowablePriorityLongPairQueue(int initialCapacity) {
checkArgument(initialCapacity > 0);
- this.capacity = MathUtil.findNextPositivePowerOfTwo(initialCapacity);
+ this.capacity = io.netty.util.internal.MathUtil.findNextPositivePowerOfTwo(initialCapacity);
data = new long[2 * capacity];
Arrays.fill(data, 0, data.length, EmptyItem);
}
+ /**
+ * Predicate to checks for a key-value pair where both of them have long types.
+ */
public interface LongPairPredicate {
boolean test(long v1, long v2);
}
- public static interface LongPairConsumer {
+ /**
+ * Represents a function that accepts two long arguments.
+ */
+ public interface LongPairConsumer {
void accept(long v1, long v2);
}
@@ -119,11 +120,10 @@ public class GrowablePriorityLongPairQueue {
/**
* Removes all of the elements of this collection that satisfy the given predicate.
- *
+ *
* @param filter
* a predicate which returns {@code true} for elements to be removed
- * @return {@code true} if any elements were removed
- *
+ *
* @return number of removed values
*/
public synchronized int removeIf(LongPairPredicate filter) {
@@ -160,7 +160,7 @@ public class GrowablePriorityLongPairQueue {
/**
* It removes all occurrence of given pair from the queue.
- *
+ *
* @param item1
* @param item2
* @return
@@ -181,8 +181,8 @@ public class GrowablePriorityLongPairQueue {
}
/**
- * Removes min element from the heap
- *
+ * Removes min element from the heap.
+ *
* @return
*/
public LongPair remove() {
@@ -195,7 +195,7 @@ public class GrowablePriorityLongPairQueue {
/**
* it is not a thread-safe method and it should be called before acquiring a lock by a caller.
- *
+ *
* @param index
* @return
*/
@@ -320,7 +320,7 @@ public class GrowablePriorityLongPairQueue {
}
}
- private static final long HashMixer = 0xc6a4a7935bd1e995l;
+ private static final long HashMixer = 0xc6a4a7935bd1e995L;
private static final int R = 47;
final static long hash(long key1, long key2) {
@@ -333,6 +333,9 @@ public class GrowablePriorityLongPairQueue {
return hash;
}
+ /**
+ * Class representing two long values.
+ */
public static class LongPair implements Comparable<LongPair> {
public final long first;
public final long second;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
index daad0ad..2187ffe 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
@@ -18,13 +18,12 @@
*/
package org.apache.pulsar.common.util.collections;
-import java.util.Collection;
-import java.util.Set;
-
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.collect.TreeRangeSet;
+import java.util.Collection;
+import java.util.Set;
/**
* A set comprising zero or more ranges type of key-value pair.
@@ -36,9 +35,9 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
* lower} and less than or equal to {@code upper}.) to this {@code RangeSet} (optional operation). That is, for equal
* range sets a and b, the result of {@code a.add(range)} is that {@code a} will be the minimal range set for which
* both {@code a.enclosesAll(b)} and {@code a.encloses(range)}.
- *
+ *
* <pre>
- *
+ *
* @param lowerKey : value for key of lowerEndpoint of Range
* @param lowerValue: value for value of lowerEndpoint of Range
* @param upperKey : value for key of upperEndpoint of Range
@@ -58,7 +57,7 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
/**
* Remove range that contains all values less than or equal to given key-value.
- *
+ *
* @param key
* @param value
*/
@@ -70,67 +69,81 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
/**
* Returns the minimal range which {@linkplain Range#encloses(Range) encloses} all ranges in this range set.
- *
+ *
* @return
*/
Range<T> span();
/**
* Returns a view of the {@linkplain Range#isConnected disconnected} ranges that make up this range set.
- *
+ *
* @return
*/
Collection<Range<T>> asRanges();
-
+
/**
- * Performs the given action for each entry in this map until all entries have been processed or action returns "false".
- * Unless otherwise specified by the implementing class, actions are performed in the order of entry
- * set iteration (if an iteration order is specified.)
- *
+ * Performs the given action for each entry in this map until all entries have been processed
+ * or action returns "false". Unless otherwise specified by the implementing class,
+ * actions are performed in the order of entry set iteration (if an iteration order is specified.)
+ *
* @param action
*/
void forEach(RangeProcessor<T> action);
-
+
/**
- * Performs the given action for each entry in this map until all entries have been processed or action returns "false".
- * Unless otherwise specified by the implementing class, actions are performed in the order of entry
- * set iteration (if an iteration order is specified.)
- *
+ * Performs the given action for each entry in this map until all entries have been processed
+ * or action returns "false". Unless otherwise specified by the implementing class,
+ * actions are performed in the order of entry set iteration (if an iteration order is specified.)
+ *
* @param action
* @param consumer
*/
void forEach(RangeProcessor<T> action, LongPairConsumer<? extends T> consumer);
-
+
/**
* Returns total number of ranges into the set.
- *
+ *
* @return
*/
int size();
/**
* It returns very first smallest range in the rangeSet.
- *
- * @return Range<T> first smallest range into the set
+ *
+ * @return first smallest range into the set
*/
Range<T> firstRange();
- public static interface LongPairConsumer<T> {
+ /**
+ * Represents a function that accepts two long arguments and produces a result.
+ *
+ * @param <T> the type of the result.
+ */
+ public interface LongPairConsumer<T> {
T apply(long key, long value);
}
- public static interface RangeProcessor<T extends Comparable<T>> {
+ /**
+ * The interface exposing a method for processing of ranges.
+ * @param <T> - The incoming type of data in the range object.
+ */
+ public interface RangeProcessor<T extends Comparable<T>> {
/**
- *
+ *
* @param range
* @return false if there is no further processing required
*/
boolean process(Range<T> range);
}
-
- public static class LongPair implements Comparable<LongPair> {
+ /**
+ * This class is a simple key-value data structure.
+ */
+ class LongPair implements Comparable<LongPair> {
+
+ @SuppressWarnings("checkstyle:ConstantName")
public static final LongPair earliest = new LongPair(-1, -1);
+ @SuppressWarnings("checkstyle:ConstantName")
public static final LongPair latest = new LongPair(Integer.MAX_VALUE, Integer.MAX_VALUE);
private long key;
@@ -160,7 +173,12 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
}
}
- public static class DefaultRangeSet<T extends Comparable<T>> implements LongPairRangeSet<T> {
+ /**
+ * Generic implementation of a default range set.
+ *
+ * @param <T> the type of values in ranges.
+ */
+ class DefaultRangeSet<T extends Comparable<T>> implements LongPairRangeSet<T> {
RangeSet<T> set = TreeRangeSet.create();
@@ -230,7 +248,7 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
}
}
}
-
+
@Override
public boolean contains(long key, long value) {
return this.contains(consumer.apply(key, value));
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairSet.java
index 15f55e3..3c6b800 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairSet.java
@@ -18,9 +18,7 @@
*/
package org.apache.pulsar.common.util.collections;
-import java.util.Objects;
import java.util.Set;
-import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet.LongPair;
@@ -28,13 +26,12 @@ import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet.LongPairC
/**
* Hash set where values are composed of pairs of longs.
- *
*/
public interface LongPairSet {
/**
* Adds composite value of item1 and item2 to set.
- *
+ *
* @param item1
* @param item2
* @return
@@ -43,7 +40,7 @@ public interface LongPairSet {
/**
* Removes composite value of item1 and item2 from set.
- *
+ *
* @param item1
* @param item2
* @return
@@ -52,15 +49,15 @@ public interface LongPairSet {
/**
* Removes composite value of item1 and item2 from set if provided predicate {@link LongPairPredicate} matches.
- *
+ *
* @param filter
* @return
*/
int removeIf(LongPairPredicate filter);
/**
- * execute {@link LongPairConsumer} processor for each entry in the set
- *
+ * Execute {@link LongPairConsumer} processor for each entry in the set.
+ *
* @param processor
*/
void forEach(LongPairConsumer processor);
@@ -76,19 +73,18 @@ public interface LongPairSet {
Set<LongPair> items(int numberOfItems);
/**
- * @return a new list of keys with max provided numberOfItems
- *
- * @param messagesToRead
+ *
+ * @param numberOfItems
* @param longPairConverter
* converts (long,long) pair to <T> object
- *
- * @return
+ *
+ * @return a new list of keys with max provided numberOfItems
*/
<T> Set<T> items(int numberOfItems, LongPairFunction<T> longPairConverter);
/**
* Check if set is empty.
- *
+ *
* @return
*/
boolean isEmpty();
@@ -98,27 +94,29 @@ public interface LongPairSet {
*/
void clear();
+ /**
+ * Predicate to checks for a key-value pair where both of them have long types.
+ */
public interface LongPairPredicate {
boolean test(long v1, long v2);
}
/**
* Returns size of the set.
- *
+ *
* @return
*/
long size();
/**
* Checks if given (item1,item2) composite value exists into set.
- *
+ *
* @param item1
* @param item2
* @return
*/
boolean contains(long item1, long item2);
-
-
+
/**
* Represents a function that accepts two long arguments and produces a result. This is the two-arity specialization
* of {@link Function}.
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
index 9d4b88e..6a9168a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
@@ -26,7 +26,7 @@ import io.netty.buffer.PooledByteBufAllocator;
/**
* Provides a priority-queue implementation specialized on items composed by 3 longs.
*
- * This class is not thread safe and the items are stored in direct memory.
+ * <p>This class is not thread safe and the items are stored in direct memory.
*/
public class TripleLongPriorityQueue implements AutoCloseable {
@@ -44,14 +44,14 @@ public class TripleLongPriorityQueue implements AutoCloseable {
private int size;
/**
- * Create a new priority queue with default initial capacity
+ * Create a new priority queue with default initial capacity.
*/
public TripleLongPriorityQueue() {
this(DEFAULT_INITIAL_CAPACITY);
}
/**
- * Create a new priority queue with a given initial capacity
+ * Create a new priority queue with a given initial capacity.
* @param initialCapacity
*/
public TripleLongPriorityQueue(int initialCapacity) {
@@ -61,7 +61,7 @@ public class TripleLongPriorityQueue implements AutoCloseable {
}
/**
- * Close the priority queue and free the memory associated
+ * Close the priority queue and free the memory associated.
*/
@Override
public void close() {
@@ -69,7 +69,7 @@ public class TripleLongPriorityQueue implements AutoCloseable {
}
/**
- * Add a tuple of 3 long items to the priority queue
+ * Add a tuple of 3 long items to the priority queue.
*
* @param n1
* @param n2
@@ -87,8 +87,8 @@ public class TripleLongPriorityQueue implements AutoCloseable {
/**
* Read the 1st long item in the top tuple in the priority queue.
- * <p>
- * The tuple will not be extracted
+ *
+ * <p>The tuple will not be extracted
*/
public long peekN1() {
checkArgument(size != 0);
@@ -97,8 +97,8 @@ public class TripleLongPriorityQueue implements AutoCloseable {
/**
* Read the 2nd long item in the top tuple in the priority queue.
- * <p>
- * The tuple will not be extracted
+ *
+ * <p>The tuple will not be extracted
*/
public long peekN2() {
checkArgument(size != 0);
@@ -107,8 +107,8 @@ public class TripleLongPriorityQueue implements AutoCloseable {
/**
* Read the 3rd long item in the top tuple in the priority queue.
- * <p>
- * The tuple will not be extracted
+ *
+ * <p>The tuple will not be extracted
*/
public long peekN3() {
checkArgument(size != 0);
@@ -126,14 +126,14 @@ public class TripleLongPriorityQueue implements AutoCloseable {
}
/**
- * Returns whether the priority queue is empty
+ * Returns whether the priority queue is empty.
*/
public boolean isEmpty() {
return size == 0;
}
/**
- * Returns the number of tuples in the priority queue
+ * Returns the number of tuples in the priority queue.
*/
public int size() {
return size;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/package-info.java
similarity index 86%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/package-info.java
index 1151443..b344230 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Set of different collections working in the concurrent environment.
+ */
+package org.apache.pulsar.common.util.collections;
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
index addacbe..8e10bcc 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.common.util.netty;
-import java.util.concurrent.ThreadFactory;
-
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
@@ -36,7 +34,9 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
+import java.util.concurrent.ThreadFactory;
+@SuppressWarnings("checkstyle:JavadocType")
public class EventLoopUtil {
/**
@@ -52,7 +52,7 @@ public class EventLoopUtil {
}
/**
- * Return a SocketChannel class suitable for the given EventLoopGroup implementation
+ * Return a SocketChannel class suitable for the given EventLoopGroup implementation.
*
* @param eventLoopGroup
* @return
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/package-info.java
similarity index 85%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/package-info.java
index 1151443..bbb2f42 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Helpers to work with events from the non-blocking I/O client-server framework.
+ */
+package org.apache.pulsar.common.util.netty;
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/package-info.java
similarity index 91%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/util/package-info.java
index 1151443..079a8e0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Common utils.
+ */
+package org.apache.pulsar.common.util;
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java
index 937b083..e4dad0c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java
@@ -47,8 +47,11 @@ import org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite;
import org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException;
import org.apache.pulsar.shaded.com.google.protobuf.v241.WireFormat;
+@SuppressWarnings("checkstyle:JavadocType")
public class ByteBufCodedInputStream {
- public static interface ByteBufMessageBuilder {
+
+ @SuppressWarnings("checkstyle:JavadocType")
+ public interface ByteBufMessageBuilder {
ByteBufMessageBuilder mergeFrom(ByteBufCodedInputStream input, ExtensionRegistryLite ext)
throws java.io.IOException;
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedOutputStream.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedOutputStream.java
index 2a58e4a..213920e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedOutputStream.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedOutputStream.java
@@ -45,8 +45,11 @@ import java.io.IOException;
import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
import org.apache.pulsar.shaded.com.google.protobuf.v241.WireFormat;
+@SuppressWarnings("checkstyle:JavadocType")
public class ByteBufCodedOutputStream {
- public static interface ByteBufGeneratedMessage {
+
+ @SuppressWarnings("checkstyle:JavadocType")
+ public interface ByteBufGeneratedMessage {
int getSerializedSize();
void writeTo(ByteBufCodedOutputStream output) throws IOException;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/package-info.java
similarity index 87%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/package-info.java
index 1151443..ea37fc8 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Classes to encode and decode in the Protocol Buffer format.
+ */
+package org.apache.pulsar.common.util.protobuf;
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/BrokerUsage.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/BrokerUsage.java
index 20faad1..5791502 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/BrokerUsage.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/BrokerUsage.java
@@ -22,8 +22,7 @@ import java.util.Map;
/**
* {@link BrokerUsage} object encapsulates the resources that are only used by broker, for now, it's connections both to
- * outside JVM and to the local VM
- *
+ * outside JVM and to the local.
*/
public class BrokerUsage {
private long connectionCount;
@@ -45,12 +44,12 @@ public class BrokerUsage {
this.replicationConnectionCount = replicationConnectionCount;
}
- /*
- * factory method that returns an instance of this class populated from metrics we expect the keys that we are
- * looking there's no explicit type checked object which guarantees that we have a specific type of metrics
+ /**
+ * Factory method that returns an instance of this class populated from metrics we expect the keys that we are
+ * looking there's no explicit type checked object which guarantees that we have a specific type of metrics.
*
* @param metrics metrics object containing the metrics collected per minute from mbeans
- *
+ *
* @return new instance of the class populated from metrics
*/
public static BrokerUsage populateFrom(Map<String, Object> metrics) {
@@ -60,8 +59,9 @@ public class BrokerUsage {
brokerUsage.connectionCount = ((Long) metrics.get("brk_conn_cnt")).longValue();
}
if (metrics.containsKey("brk_repl_conn_cnt")) {
- if (brokerUsage == null)
+ if (brokerUsage == null) {
brokerUsage = new BrokerUsage();
+ }
brokerUsage.replicationConnectionCount = ((Long) metrics.get("brk_repl_conn_cnt")).longValue();
}
return brokerUsage;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/JSONWritable.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/JSONWritable.java
index f197ab6..25bf16f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/JSONWritable.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/JSONWritable.java
@@ -18,10 +18,9 @@
*/
package org.apache.pulsar.policies.data.loadbalancer;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
-
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
/**
* Helper class used to conveniently convert a data class to a JSON.
@@ -30,7 +29,7 @@ public class JSONWritable {
/**
* Get the JSON of this object as a byte[].
- *
+ *
* @return A byte[] of this object's JSON.
* @throws JsonProcessingException
*/
@@ -41,7 +40,7 @@ public class JSONWritable {
/**
* Get the JSON of this object as a String.
- *
+ *
* @return A String of this object's JSON.
* @throws JsonProcessingException
*/
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/JvmUsage.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/JvmUsage.java
index ee0dc26..824d6db 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/JvmUsage.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/JvmUsage.java
@@ -22,7 +22,7 @@ import java.util.Map;
/**
* {@link JvmUsage} represents set of resources that are specific to JVM and are used by broker, load balancing need to
- * know this detail
+ * know this detail.
*
*/
public class JvmUsage {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadManagerReport.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadManagerReport.java
index 7fb173b..740ce3a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadManagerReport.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadManagerReport.java
@@ -18,51 +18,48 @@
*/
package org.apache.pulsar.policies.data.loadbalancer;
-import java.util.Map;
-
-
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import java.util.Map;
/**
- * This class represents the overall load of the broker - it includes overall SystemResourceUsage and Bundle-usage
+ * This class represents the overall load of the broker - it includes overall SystemResourceUsage and Bundle-usage.
*/
@JsonDeserialize(using = LoadReportDeserializer.class)
public interface LoadManagerReport extends ServiceLookupData {
- public ResourceUsage getCpu();
-
- public ResourceUsage getMemory();
+ ResourceUsage getCpu();
- public ResourceUsage getDirectMemory();
+ ResourceUsage getMemory();
- public ResourceUsage getBandwidthIn();
+ ResourceUsage getDirectMemory();
- public ResourceUsage getBandwidthOut();
+ ResourceUsage getBandwidthIn();
- public long getLastUpdate();
+ ResourceUsage getBandwidthOut();
- public Map<String, NamespaceBundleStats> getBundleStats();
+ long getLastUpdate();
- public int getNumTopics();
+ Map<String, NamespaceBundleStats> getBundleStats();
- public int getNumBundles();
+ int getNumTopics();
- public int getNumConsumers();
+ int getNumBundles();
- public int getNumProducers();
+ int getNumConsumers();
- public double getMsgThroughputIn();
+ int getNumProducers();
- public double getMsgThroughputOut();
+ double getMsgThroughputIn();
- public double getMsgRateIn();
+ double getMsgThroughputOut();
- public double getMsgRateOut();
+ double getMsgRateIn();
- public String getBrokerVersionString();
+ double getMsgRateOut();
- public boolean isPersistentTopicsEnabled();
+ String getBrokerVersionString();
- public boolean isNonPersistentTopicsEnabled();
+ boolean isPersistentTopicsEnabled();
+ boolean isNonPersistentTopicsEnabled();
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadReport.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadReport.java
index 6ca7744..c800dd0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadReport.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadReport.java
@@ -18,19 +18,17 @@
*/
package org.apache.pulsar.policies.data.loadbalancer;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-
import org.apache.pulsar.common.util.NamespaceBundleStatsComparator;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage.ResourceType;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import com.google.common.collect.Maps;
-
/**
* This class represents the overall load of the broker - it includes overall {@link SystemResourceUsage} and
* {@link NamespaceUsage} for all the namespaces hosted by this broker.
@@ -57,8 +55,9 @@ public class LoadReport implements LoadManagerReport {
private int numProducers;
private int numBundles;
// This place-holder requires to identify correct LoadManagerReport type while deserializing
+ @SuppressWarnings("checkstyle:ConstantName")
public static final String loadReportType = LoadReport.class.getSimpleName();
-
+
public LoadReport() {
this(null, null, null, null);
}
@@ -83,7 +82,7 @@ public class LoadReport implements LoadManagerReport {
}
/**
- * overall machine resource used, not just by broker process
+ * Overall machine resource used, not just by broker process.
*/
private SystemResourceUsage systemResourceUsage;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadReportDeserializer.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadReportDeserializer.java
index 6cd54c4..da92002 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadReportDeserializer.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadReportDeserializer.java
@@ -18,17 +18,18 @@
*/
package org.apache.pulsar.policies.data.loadbalancer;
-import java.io.IOException;
-
-import org.apache.pulsar.common.util.ObjectMapperFactory;
-
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.IOException;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+/**
+ * Deserializer for a load report.
+ */
public class LoadReportDeserializer extends JsonDeserializer<LoadManagerReport> {
@Override
public LoadManagerReport deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
index f76ccd1..2f2668c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
@@ -18,16 +18,13 @@
*/
package org.apache.pulsar.policies.data.loadbalancer;
-import java.util.HashMap;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.google.common.collect.Maps;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import com.google.common.collect.Maps;
-
-
/**
* Contains all the data that is maintained locally on each broker.
*/
@@ -39,8 +36,8 @@ public class LocalBrokerData extends JSONWritable implements LoadManagerReport {
private final String webServiceUrlTls;
private final String pulsarServiceUrl;
private final String pulsarServiceUrlTls;
- private boolean persistentTopicsEnabled=true;
- private boolean nonPersistentTopicsEnabled=true;
+ private boolean persistentTopicsEnabled = true;
+ private boolean nonPersistentTopicsEnabled = true;
// Most recently available system resource usage.
private ResourceUsage cpu;
@@ -79,6 +76,7 @@ public class LocalBrokerData extends JSONWritable implements LoadManagerReport {
// The version string that this broker is running, obtained from the Maven build artifact in the POM
private String brokerVersionString;
// This place-holder requires to identify correct LoadManagerReport type while deserializing
+ @SuppressWarnings("checkstyle:ConstantName")
public static final String loadReportType = LocalBrokerData.class.getSimpleName();
// For JSON only.
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStats.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStats.java
index 9eb40f6..daf5db9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStats.java
@@ -32,13 +32,13 @@ public class NamespaceBundleStats implements Comparable<NamespaceBundleStats> {
public long cacheSize;
// Consider the throughput equal if difference is less than 100 KB/s
- private static double ThroughputDifferenceThreshold = 1e5;
+ private final static double throughputDifferenceThreshold = 1e5;
// Consider the msgRate equal if the difference is less than 100
- private static double MsgRateDifferenceThreshold = 100;
+ private final static double msgRateDifferenceThreshold = 100;
// Consider the total topics/producers/consumers equal if the difference is less than 500
- private static long TopicConnectionDifferenceThreshold = 500;
+ private final static long topicConnectionDifferenceThreshold = 500;
// Consider the cache size equal if the difference is less than 100 kb
- private static long CacheSizeDifferenceThreshold = 100000;
+ private final static long cacheSizeDifferenceThreshold = 100000;
public NamespaceBundleStats() {
reset();
@@ -83,7 +83,7 @@ public class NamespaceBundleStats implements Comparable<NamespaceBundleStats> {
public int compareByMsgRate(NamespaceBundleStats other) {
double thisMsgRate = this.msgRateIn + this.msgRateOut;
double otherMsgRate = other.msgRateIn + other.msgRateOut;
- if (Math.abs(thisMsgRate - otherMsgRate) > MsgRateDifferenceThreshold) {
+ if (Math.abs(thisMsgRate - otherMsgRate) > msgRateDifferenceThreshold) {
return Double.compare(thisMsgRate, otherMsgRate);
}
return 0;
@@ -92,28 +92,28 @@ public class NamespaceBundleStats implements Comparable<NamespaceBundleStats> {
public int compareByTopicConnections(NamespaceBundleStats other) {
long thisTopicsAndConnections = this.topics + this.consumerCount + this.producerCount;
long otherTopicsAndConnections = other.topics + other.consumerCount + other.producerCount;
- if (Math.abs(thisTopicsAndConnections - otherTopicsAndConnections) > TopicConnectionDifferenceThreshold) {
+ if (Math.abs(thisTopicsAndConnections - otherTopicsAndConnections) > topicConnectionDifferenceThreshold) {
return Long.compare(thisTopicsAndConnections, otherTopicsAndConnections);
}
return 0;
}
public int compareByCacheSize(NamespaceBundleStats other) {
- if (Math.abs(this.cacheSize - other.cacheSize) > CacheSizeDifferenceThreshold) {
+ if (Math.abs(this.cacheSize - other.cacheSize) > cacheSizeDifferenceThreshold) {
return Long.compare(this.cacheSize, other.cacheSize);
}
return 0;
}
public int compareByBandwidthIn(NamespaceBundleStats other) {
- if (Math.abs(this.msgThroughputIn - other.msgThroughputIn) > ThroughputDifferenceThreshold) {
+ if (Math.abs(this.msgThroughputIn - other.msgThroughputIn) > throughputDifferenceThreshold) {
return Double.compare(this.msgThroughputIn, other.msgThroughputIn);
}
return 0;
}
public int compareByBandwidthOut(NamespaceBundleStats other) {
- if (Math.abs(this.msgThroughputOut - other.msgThroughputOut) > ThroughputDifferenceThreshold) {
+ if (Math.abs(this.msgThroughputOut - other.msgThroughputOut) > throughputDifferenceThreshold) {
return Double.compare(this.msgThroughputOut, other.msgThroughputOut);
}
return 0;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceUsage.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceUsage.java
index b8364d4..cdbd32c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceUsage.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceUsage.java
@@ -25,46 +25,46 @@ package org.apache.pulsar.policies.data.loadbalancer;
*
*/
public class NamespaceUsage {
- /** Total rate of messages produced on the broker. msg/s */
+ /** Total rate of messages produced on the broker (msg/s). */
private double msgRateIn;
- /** Total throughput of messages produced on the broker. byte/s */
+ /** Total throughput of messages produced on the broker (byte/s). */
private double msgThroughputIn;
- /** Rate of persistent messages produced on the broker. msg/s */
+ /** Rate of persistent messages produced on the broker (msg/s). */
private double msgPersistentRateIn;
- /** Throughput of persistent messages produced on the broker. byte/s */
+ /** Throughput of persistent messages produced on the broker (byte/s). */
private double msgPersistentThroughputIn;
- /** Rate of non-persistent messages produced on the broker. msg/s */
+ /** Rate of non-persistent messages produced on the broker (msg/s). */
private double msgNonPersistentRateIn;
- /** Throughput of non-persistent messages produced on the broker. byte/s */
+ /** Throughput of non-persistent messages produced on the broker (byte/s). */
private double msgNonPersistentThroughputIn;
- /** Total rate of messages consumed from the broker. msg/s */
+ /** Total rate of messages consumed from the broker (msg/s). */
private double msgRateOut;
- /** Total throughput of messages consumed from the broker. byte/s */
+ /** Total throughput of messages consumed from the broker (byte/s). */
private double msgThroughputOut;
- /** Number of messages in backlog for the broker */
+ /** Number of messages in backlog for the broker. */
private long msgBacklog;
- /** Space used to store the messages for the broker. bytes */
+ /** Space used to store the messages for the broker (bytes). */
private long storageSize;
- /** total number of producers = producer(queues) + producer(topics) */
+ /** Total number of producers = producer(queues) + producer(topics). */
private long totalProducers;
- /** number of clusters the namespace is replicated on */
+ /** Number of clusters the namespace is replicated on. */
private long totalReplicatedClusters;
- /** total number of queues */
+ /** Total number of queues. */
private long totalQueues;
- /** total number of topics */
+ /** Total number of topics. */
private long totalTopics;
private long activeSubscribers;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUnitRanking.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUnitRanking.java
index e497dd2..b46f0e9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUnitRanking.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUnitRanking.java
@@ -19,9 +19,11 @@
package org.apache.pulsar.policies.data.loadbalancer;
import java.util.Set;
-
import org.apache.pulsar.common.policies.data.ResourceQuota;
+/**
+ * The class containing information about system resources, allocated quota, and loaded bundles.
+ */
public class ResourceUnitRanking implements Comparable<ResourceUnitRanking> {
private static final long KBITS_TO_BYTES = 1024 / 8;
@@ -71,7 +73,7 @@ public class ResourceUnitRanking implements Comparable<ResourceUnitRanking> {
}
/**
- * Estimate the load percentage which is the max percentage of all resource usages
+ * Estimate the load percentage which is the max percentage of all resource usages.
*/
private void estimateLoadPercentage() {
double cpuUsed = this.systemResourceUsage.cpu.usage;
@@ -120,8 +122,8 @@ public class ResourceUnitRanking implements Comparable<ResourceUnitRanking> {
Math.max(this.estimatedLoadPercentageMemory, Math.max(this.estimatedLoadPercentageDirectMemory,
Math.max(this.estimatedLoadPercentageBandwidthIn, this.estimatedLoadPercentageBandwidthOut))));
- this.estimatedMessageRate = this.allocatedQuota.getMsgRateIn() + this.allocatedQuota.getMsgRateOut() +
- this.preAllocatedQuota.getMsgRateIn() + this.preAllocatedQuota.getMsgRateOut();
+ this.estimatedMessageRate = this.allocatedQuota.getMsgRateIn() + this.allocatedQuota.getMsgRateOut()
+ + this.preAllocatedQuota.getMsgRateIn() + this.preAllocatedQuota.getMsgRateOut();
}
@@ -149,35 +151,35 @@ public class ResourceUnitRanking implements Comparable<ResourceUnitRanking> {
}
/**
- * Compare two loads based on message rate only
+ * Compare two loads based on message rate only.
*/
public int compareMessageRateTo(ResourceUnitRanking other) {
return Double.compare(this.estimatedMessageRate, other.estimatedMessageRate);
}
/**
- * If the ResourceUnit is idle
+ * If the ResourceUnit is idle.
*/
public boolean isIdle() {
return this.loadedBundles.isEmpty() && this.preAllocatedBundles.isEmpty();
}
/**
- * Check if a ServiceUnit is already loaded by this ResourceUnit
+ * Check if a ServiceUnit is already loaded by this ResourceUnit.
*/
public boolean isServiceUnitLoaded(String suName) {
return this.loadedBundles.contains(suName);
}
/**
- * Check if a ServiceUnit is pre-allocated to this ResourceUnit
+ * Check if a ServiceUnit is pre-allocated to this ResourceUnit.
*/
public boolean isServiceUnitPreAllocated(String suName) {
return this.preAllocatedBundles.contains(suName);
}
/**
- * Pre-allocate a ServiceUnit to this ResourceUnit
+ * Pre-allocate a ServiceUnit to this ResourceUnit.
*/
public void addPreAllocatedServiceUnit(String suName, ResourceQuota quota) {
this.preAllocatedBundles.add(suName);
@@ -186,7 +188,7 @@ public class ResourceUnitRanking implements Comparable<ResourceUnitRanking> {
}
/**
- * Remove a service unit from the loaded bundle list
+ * Remove a service unit from the loaded bundle list.
*/
public void removeLoadedServiceUnit(String suName, ResourceQuota quota) {
if (this.loadedBundles.remove(suName)) {
@@ -196,7 +198,7 @@ public class ResourceUnitRanking implements Comparable<ResourceUnitRanking> {
}
/**
- * Get the pre-allocated bundles
+ * Get the pre-allocated bundles.
*/
public Set<String> getPreAllocatedBundles() {
return this.preAllocatedBundles;
@@ -210,57 +212,58 @@ public class ResourceUnitRanking implements Comparable<ResourceUnitRanking> {
}
/**
- * Get the estimated load percentage
+ * Get the estimated load percentage.
*/
public double getEstimatedLoadPercentage() {
return this.estimatedLoadPercentage;
}
/**
- * Get the estimated message rate
+ * Get the estimated message rate.
*/
public double getEstimatedMessageRate() {
return this.estimatedMessageRate;
}
/**
- * Percentage of CPU allocated to bundle's quota
+ * Percentage of CPU allocated to bundle's quota.
*/
public double getAllocatedLoadPercentageCPU() {
return this.allocatedLoadPercentageCPU;
}
/**
- * Percetage of memory allocated to bundle's quota
+ * Percetage of memory allocated to bundle's quota.
*/
public double getAllocatedLoadPercentageMemory() {
return this.allocatedLoadPercentageMemory;
}
/**
- * Percentage of inbound bandwidth allocated to bundle's quota
+ * Percentage of inbound bandwidth allocated to bundle's quota.
*/
public double getAllocatedLoadPercentageBandwidthIn() {
return this.allocatedLoadPercentageBandwidthIn;
}
/**
- * Percentage of outbound bandwidth allocated to bundle's quota
+ * Percentage of outbound bandwidth allocated to bundle's quota.
*/
public double getAllocatedLoadPercentageBandwidthOut() {
return this.allocatedLoadPercentageBandwidthOut;
}
/**
- * Get the load percentage in String, with detail resource usages
+ * Get the load percentage in String, with detail resource usages.
*/
public String getEstimatedLoadPercentageString() {
return String.format(
- "msgrate: %.0f, load: %.1f%% - cpu: %.1f%%, mem: %.1f%%, directMemory: %.1f%%, bandwidthIn: %.1f%%, bandwidthOut: %.1f%%",
- this.estimatedMessageRate,
- this.estimatedLoadPercentage, this.estimatedLoadPercentageCPU, this.estimatedLoadPercentageMemory,
- this.estimatedLoadPercentageDirectMemory, this.estimatedLoadPercentageBandwidthIn,
- this.estimatedLoadPercentageBandwidthOut);
+ "msgrate: %.0f, load: %.1f%% - cpu: %.1f%%, mem: %.1f%%, directMemory: %.1f%%, "
+ + "bandwidthIn: %.1f%%, bandwidthOut: %.1f%%",
+ this.estimatedMessageRate,
+ this.estimatedLoadPercentage, this.estimatedLoadPercentageCPU, this.estimatedLoadPercentageMemory,
+ this.estimatedLoadPercentageDirectMemory, this.estimatedLoadPercentageBandwidthIn,
+ this.estimatedLoadPercentageBandwidthOut);
}
/**
@@ -271,7 +274,7 @@ public class ResourceUnitRanking implements Comparable<ResourceUnitRanking> {
}
/**
- * Estimate the maximum number namespace bundles a ResourceUnit is able to handle with all resource
+ * Estimate the maximum number namespace bundles a ResourceUnit is able to handle with all resource.
*/
public static long calculateBrokerMaxCapacity(SystemResourceUsage systemResourceUsage, ResourceQuota defaultQuota) {
double bandwidthOutLimit = systemResourceUsage.bandwidthOut.limit * KBITS_TO_BYTES;
@@ -283,7 +286,7 @@ public class ResourceUnitRanking implements Comparable<ResourceUnitRanking> {
}
/**
- * Calculate how many bundles could be handle with the specified resources
+ * Calculate how many bundles could be handle with the specified resources.
*/
private static long calculateBrokerCapacity(ResourceQuota defaultQuota, double usableCPU, double usableMem,
double usableBandwidthOut, double usableBandwidthIn) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUsage.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUsage.java
index aff454c..b97e314 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUsage.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUsage.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.policies.data.loadbalancer;
/**
* POJO used to represents any system specific resource usage this is the format that load manager expects it in.
- *
*/
public class ResourceUsage {
public double usage;
@@ -45,8 +44,8 @@ public class ResourceUsage {
}
/**
- * this may be wrong since we are comparing available and not the usage
- *
+ * this may be wrong since we are comparing available and not the usage.
+ *
* @param o
* @return
*/
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ServiceLookupData.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ServiceLookupData.java
index 2cb7971..98f61d7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ServiceLookupData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ServiceLookupData.java
@@ -18,13 +18,15 @@
*/
package org.apache.pulsar.policies.data.loadbalancer;
-// For backwards compatibility purposes.
+/**
+ * For backwards compatibility purposes.
+ */
public interface ServiceLookupData {
- public String getWebServiceUrl();
+ String getWebServiceUrl();
- public String getWebServiceUrlTls();
+ String getWebServiceUrlTls();
- public String getPulsarServiceUrl();
+ String getPulsarServiceUrl();
- public String getPulsarServiceUrlTls();
+ String getPulsarServiceUrlTls();
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/SystemResourceUsage.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/SystemResourceUsage.java
index 6864d97..168bbca 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/SystemResourceUsage.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/SystemResourceUsage.java
@@ -20,10 +20,12 @@ package org.apache.pulsar.policies.data.loadbalancer;
/**
* This class represents a object which reflects system resource usage per resource and the upper limit on the resource.
- *
*/
public class SystemResourceUsage {
+ /**
+ * Definition of possible resource types.
+ */
public enum ResourceType {
CPU, Memory, BandwidthIn, BandwidthOut
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/package-info.java
similarity index 87%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
copy to pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/package-info.java
index 1151443..158c6c9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/package-info.java
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.api.raw;
-
-public interface RawMessageId {
-
-}
+/**
+ * Classes to define policies for the load distribution.
+ */
+package org.apache.pulsar.policies.data.loadbalancer;
\ No newline at end of file