You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2015/02/04 05:58:45 UTC
[6/6] kafka git commit: KAFKA-1915: Add checkstyle for java code.
KAFKA-1915: Add checkstyle for java code.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1c6d5bba
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1c6d5bba
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1c6d5bba
Branch: refs/heads/trunk
Commit: 1c6d5bbac672cbf49591aed0889510b10e3285fa
Parents: f1ba4ff
Author: Jay Kreps <ja...@gmail.com>
Authored: Mon Feb 2 21:36:21 2015 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Tue Feb 3 09:16:55 2015 -0800
----------------------------------------------------------------------
README.md | 3 +
build.gradle | 6 +
checkstyle/checkstyle.xml | 83 +
checkstyle/import-control.xml | 100 +
.../org/apache/kafka/clients/ClientUtils.java | 48 +
.../kafka/clients/ClusterConnectionStates.java | 9 +-
.../java/org/apache/kafka/clients/Metadata.java | 170 ++
.../org/apache/kafka/clients/NetworkClient.java | 3 +-
.../kafka/clients/consumer/CommitType.java | 12 +
.../kafka/clients/consumer/ConsumerConfig.java | 8 +-
.../kafka/clients/consumer/ConsumerRecords.java | 40 +-
.../kafka/clients/consumer/KafkaConsumer.java | 43 +-
.../clients/consumer/internals/Heartbeat.java | 12 +
.../NoOpConsumerRebalanceCallback.java | 4 +-
.../consumer/internals/SubscriptionState.java | 16 +-
.../kafka/clients/producer/KafkaProducer.java | 25 +-
.../apache/kafka/clients/producer/Producer.java | 6 +-
.../kafka/clients/producer/ProducerConfig.java | 10 +-
.../clients/producer/internals/BufferPool.java | 8 +-
.../clients/producer/internals/Metadata.java | 170 --
.../internals/ProduceRequestResult.java | 1 -
.../producer/internals/RecordAccumulator.java | 40 +-
.../clients/producer/internals/Sender.java | 1 +
.../clients/tools/ProducerPerformance.java | 8 +-
.../java/org/apache/kafka/common/Cluster.java | 4 +-
.../org/apache/kafka/common/MetricName.java | 4 +-
.../org/apache/kafka/common/PartitionInfo.java | 2 +-
.../apache/kafka/common/config/ConfigDef.java | 82 +-
.../NotEnoughReplicasAfterAppendException.java | 33 +-
.../errors/NotEnoughReplicasException.java | 28 +-
.../message/KafkaLZ4BlockInputStream.java | 233 --
.../message/KafkaLZ4BlockOutputStream.java | 387 ---
.../kafka/common/metrics/JmxReporter.java | 57 +-
.../org/apache/kafka/common/metrics/Sensor.java | 12 +-
.../apache/kafka/common/metrics/stats/Rate.java | 2 +-
.../kafka/common/network/NetworkReceive.java | 2 +-
.../apache/kafka/common/network/Selector.java | 9 +-
.../apache/kafka/common/protocol/ApiKeys.java | 12 +-
.../apache/kafka/common/protocol/Protocol.java | 590 +++--
.../kafka/common/protocol/types/Struct.java | 2 +-
.../common/record/ByteBufferOutputStream.java | 2 +-
.../apache/kafka/common/record/Compressor.java | 29 +-
.../common/record/KafkaLZ4BlockInputStream.java | 234 ++
.../record/KafkaLZ4BlockOutputStream.java | 392 +++
.../kafka/common/record/MemoryRecords.java | 36 +-
.../requests/ConsumerMetadataRequest.java | 2 +-
.../requests/ConsumerMetadataResponse.java | 2 +-
.../kafka/common/requests/FetchRequest.java | 2 +-
.../kafka/common/requests/FetchResponse.java | 2 +-
.../kafka/common/requests/HeartbeatRequest.java | 2 +-
.../common/requests/HeartbeatResponse.java | 2 +-
.../kafka/common/requests/JoinGroupRequest.java | 2 +-
.../common/requests/JoinGroupResponse.java | 2 +-
.../common/requests/ListOffsetRequest.java | 4 +-
.../common/requests/ListOffsetResponse.java | 2 +-
.../kafka/common/requests/MetadataRequest.java | 2 +-
.../kafka/common/requests/MetadataResponse.java | 8 +-
.../common/requests/OffsetCommitRequest.java | 4 +-
.../common/requests/OffsetCommitResponse.java | 2 +-
.../common/requests/OffsetFetchRequest.java | 4 +-
.../common/requests/OffsetFetchResponse.java | 2 +-
.../kafka/common/requests/ProduceRequest.java | 2 +-
.../kafka/common/requests/ProduceResponse.java | 2 +-
.../kafka/common/requests/RequestHeader.java | 8 +-
.../kafka/common/requests/ResponseHeader.java | 4 +-
.../apache/kafka/common/utils/ClientUtils.java | 49 -
.../org/apache/kafka/common/utils/Crc32.java | 2338 ++----------------
.../org/apache/kafka/common/utils/Utils.java | 44 +-
.../apache/kafka/clients/ClientUtilsTest.java | 42 +
.../org/apache/kafka/clients/MockClient.java | 18 +-
.../apache/kafka/clients/NetworkClientTest.java | 17 +-
.../clients/consumer/MockConsumerTest.java | 16 +
.../internals/SubscriptionStateTest.java | 16 +
.../kafka/clients/producer/BufferPoolTest.java | 14 +-
.../kafka/clients/producer/MetadataTest.java | 8 +-
.../clients/producer/MockProducerTest.java | 1 +
.../kafka/clients/producer/PartitionerTest.java | 2 +-
.../clients/producer/RecordAccumulatorTest.java | 3 +-
.../kafka/clients/producer/RecordSendTest.java | 5 +-
.../kafka/clients/producer/SenderTest.java | 6 +-
.../kafka/common/config/AbstractConfigTest.java | 106 +-
.../kafka/common/config/ConfigDefTest.java | 5 +-
.../common/metrics/FakeMetricsReporter.java | 32 +
.../kafka/common/metrics/MetricsTest.java | 4 +-
.../common/metrics/stats/HistogramTest.java | 1 -
.../kafka/common/network/SelectorTest.java | 1 -
.../types/ProtocolSerializationTest.java | 14 +-
.../kafka/common/record/MemoryRecordsTest.java | 2 +-
.../apache/kafka/common/record/RecordTest.java | 4 +-
.../common/requests/RequestResponseTest.java | 23 +-
.../common/serialization/SerializationTest.java | 4 +-
.../kafka/common/utils/ClientUtilsTest.java | 42 -
.../org/apache/kafka/common/utils/CrcTest.java | 8 +-
.../org/apache/kafka/test/Microbenchmarks.java | 1 -
.../java/org/apache/kafka/test/TestUtils.java | 16 +-
.../consumer/ConsumerRebalanceListener.java | 3 -
.../kafka/message/CompressionFactory.scala | 2 +-
.../scala/kafka/tools/KafkaMigrationTool.java | 12 +-
core/src/main/scala/kafka/utils/Crc32.java | 40 +-
.../java/kafka/examples/SimpleConsumerDemo.java | 9 +-
100 files changed, 2225 insertions(+), 3721 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 9bdcf70..784daaf 100644
--- a/README.md
+++ b/README.md
@@ -98,6 +98,9 @@ Please note for this to work you should create/update `~/.gradle/gradle.properti
### Determining how transitive dependencies are added ###
./gradlew core:dependencies --configuration runtime
+
+### Running checkstyle on the java code ###
+ ./gradlew checkstyleMain checkstyleTest
### Running in Vagrant ###
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 6844372..0f0fe60 100644
--- a/build.gradle
+++ b/build.gradle
@@ -345,6 +345,7 @@ project(':examples') {
}
project(':clients') {
+ apply plugin: 'checkstyle'
archivesBaseName = "kafka-clients"
dependencies {
@@ -379,4 +380,9 @@ project(':clients') {
artifacts {
archives testJar
}
+
+ checkstyle {
+ configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+ }
+ test.dependsOn('checkstyleMain', 'checkstyleTest')
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
new file mode 100644
index 0000000..a215ff3
--- /dev/null
+++ b/checkstyle/checkstyle.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE module PUBLIC
+ "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+ "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
+<!--
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+-->
+<module name="Checker">
+ <property name="localeLanguage" value="en"/>
+
+ <module name="FileTabCharacter"/>
+
+ <!-- header -->
+ <module name="RegexpHeader">
+ <property name="header" value="/\*\*\nLicensed to the Apache.*"/>
+ </module>
+
+ <module name="TreeWalker">
+
+ <!-- code cleanup -->
+ <module name="UnusedImports"/>
+ <module name="RedundantImport"/>
+ <module name="IllegalImport" />
+ <module name="EqualsHashCode"/>
+ <module name="SimplifyBooleanExpression"/>
+ <module name="OneStatementPerLine"/>
+ <module name="UnnecessaryParentheses" />
+ <module name="SimplifyBooleanReturn"/>
+
+ <!-- style -->
+ <module name="DefaultComesLast"/>
+ <module name="EmptyStatement"/>
+ <module name="ArrayTypeStyle"/>
+ <module name="UpperEll"/>
+ <module name="LeftCurly"/>
+ <module name="RightCurly"/>
+ <module name="EmptyStatement"/>
+ <module name="ConstantName">
+ <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/>
+ </module>
+ <module name="LocalVariableName"/>
+ <module name="LocalFinalVariableName"/>
+ <module name="ClassTypeParameterName"/>
+ <module name="MemberName"/>
+ <module name="MethodTypeParameterName"/>
+ <module name="PackageName"/>
+ <module name="ParameterName"/>
+ <module name="StaticVariableName"/>
+ <module name="TypeName"/>
+
+ <!-- dependencies -->
+ <module name="ImportControl">
+ <property name="file" value="${basedir}/checkstyle/import-control.xml"/>
+ </module>
+
+ <!-- whitespace -->
+ <module name="GenericWhitespace"/>
+ <module name="NoWhitespaceBefore"/>
+ <module name="WhitespaceAfter" />
+ <module name="NoWhitespaceAfter"/>
+ <module name="WhitespaceAround">
+ <property name="allowEmptyConstructors" value="true"/>
+ <property name="allowEmptyMethods" value="true"/>
+ </module>
+ <module name="Indentation"/>
+ <module name="MethodParamPad"/>
+ <module name="ParenPad"/>
+ <module name="TypecastParenPad"/>
+ </module>
+</module>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
new file mode 100644
index 0000000..cca4b38
--- /dev/null
+++ b/checkstyle/import-control.xml
@@ -0,0 +1,100 @@
+<!DOCTYPE import-control PUBLIC
+ "-//Puppy Crawl//DTD Import Control 1.1//EN"
+ "http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
+<!--
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+-->
+<import-control pkg="org.apache.kafka">
+
+ <!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
+
+ <!-- common library dependencies -->
+ <allow pkg="java" />
+ <allow pkg="javax.management" />
+ <allow pkg="org.slf4j" />
+ <allow pkg="org.junit" />
+
+ <!-- no one depends on the server -->
+ <disallow pkg="kafka" />
+
+ <!-- anyone can use public classes -->
+ <allow pkg="org.apache.kafka.common" exact-match="true" />
+ <allow pkg="org.apache.kafka.common.utils" />
+
+ <subpackage name="common">
+ <disallow pkg="org.apache.kafka.clients" />
+ <allow pkg="org.apache.kafka.common" exact-match="true" />
+ <allow pkg="org.apache.kafka.test" />
+
+ <subpackage name="config">
+ <allow pkg="org.apache.kafka.common.config" />
+ <!-- for testing -->
+ <allow pkg="org.apache.kafka.common.metrics" />
+ </subpackage>
+
+ <subpackage name="metrics">
+ <allow pkg="org.apache.kafka.common.metrics" />
+ </subpackage>
+
+ <subpackage name="network">
+ <allow pkg="org.apache.kafka.common.metrics" />
+ </subpackage>
+
+ <subpackage name="protocol">
+ <allow pkg="org.apache.kafka.common.errors" />
+ <allow pkg="org.apache.kafka.common.protocol.types" />
+ </subpackage>
+
+ <subpackage name="record">
+ <allow pkg="net.jpountz" />
+ <allow pkg="org.apache.kafka.common.record" />
+ </subpackage>
+
+ <subpackage name="requests">
+ <allow pkg="org.apache.kafka.common.protocol" />
+ <allow pkg="org.apache.kafka.common.network" />
+ </subpackage>
+
+ <subpackage name="serialization">
+ <allow class="org.apache.kafka.common.errors.SerializationException" />
+ </subpackage>
+ </subpackage>
+
+ <subpackage name="clients">
+ <allow pkg="org.apache.kafka.common" />
+ <allow pkg="org.slf4j" />
+ <allow pkg="org.apache.kafka.clients" exact-match="true"/>
+ <allow pkg="org.apache.kafka.test" />
+
+ <subpackage name="consumer">
+ <allow pkg="org.apache.kafka.clients.consumer" />
+ </subpackage>
+
+ <subpackage name="producer">
+ <allow pkg="org.apache.kafka.clients.producer" />
+ </subpackage>
+
+ <subpackage name="tools">
+ <allow pkg="org.apache.kafka.clients.producer" />
+ <allow pkg="org.apache.kafka.clients.consumer" />
+ </subpackage>
+ </subpackage>
+
+ <subpackage name="test">
+ <allow pkg="org.apache.kafka" />
+ </subpackage>
+
+</import-control>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
new file mode 100644
index 0000000..d0da5d7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kafka.common.config.ConfigException;
+
+import static org.apache.kafka.common.utils.Utils.getHost;
+import static org.apache.kafka.common.utils.Utils.getPort;
+
+public class ClientUtils {
+
+ public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
+ List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
+ for (String url : urls) {
+ if (url != null && url.length() > 0) {
+ String host = getHost(url);
+ Integer port = getPort(url);
+ if (host == null || port == null)
+ throw new ConfigException("Invalid url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
+ try {
+ InetSocketAddress address = new InetSocketAddress(host, port);
+ if (address.isUnresolved())
+ throw new ConfigException("DNS resolution failed for url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
+ addresses.add(address);
+ } catch (NumberFormatException e) {
+ throw new ConfigException("Invalid port in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
+ }
+ }
+ }
+ if (addresses.size() < 1)
+ throw new ConfigException("No bootstrap urls given in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+ return addresses;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 574287d..da76cc2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -69,8 +69,7 @@ final class ClusterConnectionStates {
long timeWaited = now - state.lastConnectAttemptMs;
if (state.state == ConnectionState.DISCONNECTED) {
return Math.max(this.reconnectBackoffMs - timeWaited, 0);
- }
- else {
+ } else {
// When connecting or connected, we should be able to delay indefinitely since other events (connection or
// data acked) will cause a wakeup once data can be sent.
return Long.MAX_VALUE;
@@ -109,7 +108,8 @@ final class ClusterConnectionStates {
* @param node The node we have connected to
*/
public void connected(int node) {
- nodeState(node).state = ConnectionState.CONNECTED;
+ NodeConnectionState nodeState = nodeState(node);
+ nodeState.state = ConnectionState.CONNECTED;
}
/**
@@ -117,7 +117,8 @@ final class ClusterConnectionStates {
* @param node The node we have disconnected from
*/
public void disconnected(int node) {
- nodeState(node).state = ConnectionState.DISCONNECTED;
+ NodeConnectionState nodeState = nodeState(node);
+ nodeState.state = ConnectionState.DISCONNECTED;
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
new file mode 100644
index 0000000..b8cdd14
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A class encapsulating some of the logic around metadata.
+ * <p>
+ * This class is shared by the client thread (for partitioning) and the background sender thread.
+ *
+ * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a
+ * topic we don't have any metadata for it will trigger a metadata update.
+ */
+public final class Metadata {
+
+ private static final Logger log = LoggerFactory.getLogger(Metadata.class);
+
+ private final long refreshBackoffMs;
+ private final long metadataExpireMs;
+ private int version;
+ private long lastRefreshMs;
+ private Cluster cluster;
+ private boolean needUpdate;
+ private final Set<String> topics;
+
+ /**
+ * Create a metadata instance with reasonable defaults
+ */
+ public Metadata() {
+ this(100L, 60 * 60 * 1000L);
+ }
+
+ /**
+ * Create a new Metadata instance
+ * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
+ * polling
+ * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
+ */
+ public Metadata(long refreshBackoffMs, long metadataExpireMs) {
+ this.refreshBackoffMs = refreshBackoffMs;
+ this.metadataExpireMs = metadataExpireMs;
+ this.lastRefreshMs = 0L;
+ this.version = 0;
+ this.cluster = Cluster.empty();
+ this.needUpdate = false;
+ this.topics = new HashSet<String>();
+ }
+
+ /**
+ * Get the current cluster info without blocking
+ */
+ public synchronized Cluster fetch() {
+ return this.cluster;
+ }
+
+ /**
+ * Add the topic to maintain in the metadata
+ */
+ public synchronized void add(String topic) {
+ topics.add(topic);
+ }
+
+ /**
+ * The next time to update the cluster info is the maximum of the time the current info will expire and the time the
+ * current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time
+ * is now
+ */
+ public synchronized long timeToNextUpdate(long nowMs) {
+ long timeToExpire = needUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0);
+ long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
+ return Math.max(timeToExpire, timeToAllowUpdate);
+ }
+
+ /**
+ * Request an update of the current cluster metadata info, return the current version before the update
+ */
+ public synchronized int requestUpdate() {
+ this.needUpdate = true;
+ return this.version;
+ }
+
+ /**
+ * Wait for metadata update until the current version is larger than the last version we know of
+ */
+ public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) {
+ if (maxWaitMs < 0) {
+ throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
+ }
+ long begin = System.currentTimeMillis();
+ long remainingWaitMs = maxWaitMs;
+ while (this.version <= lastVersion) {
+ try {
+ if (remainingWaitMs != 0) {
+ wait(remainingWaitMs);
+ }
+ } catch (InterruptedException e) { /* this is fine */
+ }
+ long elapsed = System.currentTimeMillis() - begin;
+ if (elapsed >= maxWaitMs)
+ throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
+ remainingWaitMs = maxWaitMs - elapsed;
+ }
+ }
+
+ /**
+ * Add one or more topics to maintain metadata for
+ */
+ public synchronized void addTopics(String... topics) {
+ for (String topic : topics)
+ this.topics.add(topic);
+ requestUpdate();
+ }
+
+ /**
+ * Get the list of topics we are currently maintaining metadata for
+ */
+ public synchronized Set<String> topics() {
+ return new HashSet<String>(this.topics);
+ }
+
+ /**
+ * Update the cluster metadata
+ */
+ public synchronized void update(Cluster cluster, long now) {
+ this.needUpdate = false;
+ this.lastRefreshMs = now;
+ this.version += 1;
+ this.cluster = cluster;
+ notifyAll();
+ log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
+ }
+
+ /**
+ * @return The current metadata version
+ */
+ public synchronized int version() {
+ return this.version;
+ }
+
+ /**
+ * The last time metadata was updated.
+ */
+ public synchronized long lastUpdate() {
+ return this.lastRefreshMs;
+ }
+
+ /**
+ * The metadata refresh backoff in ms
+ */
+ public long refreshBackoff() {
+ return refreshBackoffMs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 5950191..fef90a0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -19,7 +19,6 @@ import java.util.List;
import java.util.Random;
import java.util.Set;
-import org.apache.kafka.clients.producer.internals.Metadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.network.NetworkReceive;
@@ -199,7 +198,7 @@ public class NetworkClient implements KafkaClient {
// should we update our metadata?
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
- long waitForMetadataFetch = (this.metadataFetchInProgress ? Integer.MAX_VALUE : 0);
+ long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
// if there is no node available to connect, back off refreshing metadata
long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
waitForMetadataFetch);
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java
index 072cc2e..7548a9b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java
@@ -1,3 +1,15 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
package org.apache.kafka.clients.consumer;
public enum CommitType {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 6d4ff7c..5fb2100 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -28,7 +28,7 @@ import org.apache.kafka.common.config.ConfigDef.Type;
* The consumer configuration keys
*/
public class ConsumerConfig extends AbstractConfig {
- private static final ConfigDef config;
+ private static final ConfigDef CONFIG;
/*
* NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS
@@ -154,7 +154,7 @@ public class ConsumerConfig extends AbstractConfig {
private static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>Deserializer</code> interface.";
static {
- config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
+ CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
Importance.HIGH,
CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
@@ -277,11 +277,11 @@ public class ConsumerConfig extends AbstractConfig {
}
ConsumerConfig(Map<? extends Object, ? extends Object> props) {
- super(config, props);
+ super(CONFIG, props);
}
public static void main(String[] args) {
- System.out.println(config.toHtmlTable());
+ System.out.println(CONFIG.toHtmlTable());
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
index 416d703..305ec8e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -26,11 +26,11 @@ import org.apache.kafka.common.utils.AbstractIterator;
* particular topic. There is one for every topic returned by a
* {@link Consumer#poll(long)} operation.
*/
-public class ConsumerRecords<K,V> implements Iterable<ConsumerRecord<K,V>> {
+public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
- private final Map<TopicPartition, List<ConsumerRecord<K,V>>> records;
+ private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
- public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K,V>>> records) {
+ public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records) {
this.records = records;
}
@@ -39,8 +39,8 @@ public class ConsumerRecords<K,V> implements Iterable<ConsumerRecord<K,V>> {
*
* @param partition The partition to get records for
*/
- public Iterable<ConsumerRecord<K,V>> records(TopicPartition partition) {
- List<ConsumerRecord<K,V>> recs = this.records.get(partition);
+ public Iterable<ConsumerRecord<K, V>> records(TopicPartition partition) {
+ List<ConsumerRecord<K, V>> recs = this.records.get(partition);
if (recs == null)
return Collections.emptyList();
else
@@ -50,20 +50,20 @@ public class ConsumerRecords<K,V> implements Iterable<ConsumerRecord<K,V>> {
/**
* Get just the records for the given topic
*/
- public Iterable<ConsumerRecord<K,V>> records(String topic) {
+ public Iterable<ConsumerRecord<K, V>> records(String topic) {
if (topic == null)
throw new IllegalArgumentException("Topic must be non-null.");
- List<List<ConsumerRecord<K,V>>> recs = new ArrayList<List<ConsumerRecord<K,V>>>();
- for (Map.Entry<TopicPartition, List<ConsumerRecord<K,V>>> entry : records.entrySet()) {
+ List<List<ConsumerRecord<K, V>>> recs = new ArrayList<List<ConsumerRecord<K, V>>>();
+ for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : records.entrySet()) {
if (entry.getKey().equals(topic))
recs.add(entry.getValue());
}
- return new ConcatenatedIterable<K,V>(recs);
+ return new ConcatenatedIterable<K, V>(recs);
}
@Override
- public Iterator<ConsumerRecord<K,V>> iterator() {
- return new ConcatenatedIterable<K,V>(records.values()).iterator();
+ public Iterator<ConsumerRecord<K, V>> iterator() {
+ return new ConcatenatedIterable<K, V>(records.values()).iterator();
}
/**
@@ -71,26 +71,26 @@ public class ConsumerRecords<K,V> implements Iterable<ConsumerRecord<K,V>> {
*/
public int count() {
int count = 0;
- for(List<ConsumerRecord<K,V>> recs: this.records.values())
+ for (List<ConsumerRecord<K, V>> recs: this.records.values())
count += recs.size();
return count;
}
- private static class ConcatenatedIterable<K,V> implements Iterable<ConsumerRecord<K,V>> {
+ private static class ConcatenatedIterable<K, V> implements Iterable<ConsumerRecord<K, V>> {
- private final Iterable<? extends Iterable<ConsumerRecord<K,V>>> iterables;
+ private final Iterable<? extends Iterable<ConsumerRecord<K, V>>> iterables;
- public ConcatenatedIterable(Iterable<? extends Iterable<ConsumerRecord<K,V>>> iterables) {
+ public ConcatenatedIterable(Iterable<? extends Iterable<ConsumerRecord<K, V>>> iterables) {
this.iterables = iterables;
}
@Override
- public Iterator<ConsumerRecord<K,V>> iterator() {
- return new AbstractIterator<ConsumerRecord<K,V>>() {
- Iterator<? extends Iterable<ConsumerRecord<K,V>>> iters = iterables.iterator();
- Iterator<ConsumerRecord<K,V>> current;
+ public Iterator<ConsumerRecord<K, V>> iterator() {
+ return new AbstractIterator<ConsumerRecord<K, V>>() {
+ Iterator<? extends Iterable<ConsumerRecord<K, V>>> iters = iterables.iterator();
+ Iterator<ConsumerRecord<K, V>> current;
- public ConsumerRecord<K,V> makeNext() {
+ public ConsumerRecord<K, V> makeNext() {
if (current == null || !current.hasNext()) {
if (iters.hasNext())
current = iters.next().iterator();
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 300c551..09a6f11 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -30,13 +30,13 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.ConnectionState;
+import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.internals.Heartbeat;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.internals.Metadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
@@ -78,7 +78,6 @@ import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestSend;
-import org.apache.kafka.common.utils.ClientUtils;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -380,7 +379,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
private static final long LATEST_OFFSET_TIMESTAMP = -1L;
- private static final AtomicInteger consumerAutoId = new AtomicInteger(1);
+ private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private final Time time;
private final ConsumerMetrics metrics;
@@ -547,15 +546,15 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
TimeUnit.MILLISECONDS);
String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
String jmxPrefix = "kafka.consumer";
- if(clientId .length() <= 0)
- clientId = "consumer-" + consumerAutoId.getAndIncrement();
+ if (clientId.length() <= 0)
+ clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
reporters.add(new JmxReporter(jmxPrefix));
Metrics metrics = new Metrics(metricConfig, reporters, time);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG));
- List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), 0);
String metricsGroup = "consumer";
@@ -1554,23 +1553,31 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
"The maximum lag for any partition in this window",
tags), new Max());
+ Measurable numParts =
+ new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return subscriptions.assignedPartitions().size();
+ }
+ };
metrics.addMetric(new MetricName("assigned-partitions",
metricsGroup,
"The number of partitions currently assigned to this consumer",
- tags), new Measurable() {
- public double measure(MetricConfig config, long now) {
- return subscriptions.assignedPartitions().size();
- }
- });
-
+ tags),
+ numParts);
+
+
+ Measurable lastHeartbeat =
+ new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
+ }
+ };
metrics.addMetric(new MetricName("last-heartbeat-seconds-ago",
metricsGroup,
"The number of seconds since the last controller heartbeat",
- tags), new Measurable() {
- public double measure(MetricConfig config, long now) {
- return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
- }
- });
+ tags),
+
+ lastHeartbeat);
}
public void recordTopicFetchMetrics(String topic, int bytes, int records) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
index d9483ec..ee0751e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
@@ -1,3 +1,15 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
package org.apache.kafka.clients.consumer.internals;
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
index 7e57a39..c06ab3a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
@@ -22,9 +22,9 @@ import org.apache.kafka.common.TopicPartition;
public class NoOpConsumerRebalanceCallback implements ConsumerRebalanceCallback {
@Override
- public void onPartitionsAssigned(Consumer<?,?> consumer, Collection<TopicPartition> partitions) {}
+ public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
@Override
- public void onPartitionsRevoked(Consumer<?,?> consumer, Collection<TopicPartition> partitions) {}
+ public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 71ce20d..d41d306 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -1,3 +1,15 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
package org.apache.kafka.clients.consumer.internals;
import java.util.HashMap;
@@ -58,8 +70,8 @@ public class SubscriptionState {
throw new IllegalStateException("Topic " + topic + " was never subscribed to.");
this.subscribedTopics.remove(topic);
this.needsPartitionAssignment = true;
- for(TopicPartition tp: assignedPartitions())
- if(topic.equals(tp.topic()))
+ for (TopicPartition tp: assignedPartitions())
+ if (topic.equals(tp.topic()))
clearPartition(tp);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index ebc4c53..1fd6917 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -19,8 +19,9 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
-import org.apache.kafka.clients.producer.internals.Metadata;
import org.apache.kafka.clients.producer.internals.Partitioner;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
@@ -45,7 +46,6 @@ import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.ClientUtils;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
@@ -60,9 +60,10 @@ import org.slf4j.LoggerFactory;
* The producer manages a single background thread that does I/O as well as a TCP connection to each of the brokers it
* needs to communicate with. Failure to close the producer after use will leak these resources.
*/
-public class KafkaProducer<K,V> implements Producer<K,V> {
+public class KafkaProducer<K, V> implements Producer<K, V> {
private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
+ private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private final Partitioner partitioner;
private final int maxRequestSize;
@@ -79,7 +80,6 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
private final Serializer<K> keySerializer;
private final Serializer<V> valueSerializer;
private final ProducerConfig producerConfig;
- private static final AtomicInteger producerAutoId = new AtomicInteger(1);
/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -154,6 +154,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
return newProperties;
}
+ @SuppressWarnings("unchecked")
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
log.trace("Starting the Kafka producer");
this.producerConfig = config;
@@ -162,8 +163,8 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
TimeUnit.MILLISECONDS);
String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
- if(clientId.length() <= 0)
- clientId = "producer-" + producerAutoId.getAndIncrement();
+ if (clientId.length() <= 0)
+ clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
String jmxPrefix = "kafka.producer";
List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
@@ -216,16 +217,16 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.keySerializer.configure(config.originals(), true);
- }
- else
+ } else {
this.keySerializer = keySerializer;
+ }
if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.valueSerializer.configure(config.originals(), false);
- }
- else
+ } else {
this.valueSerializer = valueSerializer;
+ }
config.logUnused();
log.debug("Kafka producer started");
@@ -244,7 +245,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
* @param record The record to be sent
*/
@Override
- public Future<RecordMetadata> send(ProducerRecord<K,V> record) {
+ public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}
@@ -309,7 +310,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
* indicates no callback)
*/
@Override
- public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback) {
+ public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
try {
// first make sure the metadata for the topic is available
waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs);
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index 6b2471f..17fe541 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -31,7 +31,7 @@ import org.apache.kafka.common.MetricName;
* @see KafkaProducer
* @see MockProducer
*/
-public interface Producer<K,V> extends Closeable {
+public interface Producer<K, V> extends Closeable {
/**
* Send the given record asynchronously and return a future which will eventually contain the response information.
@@ -39,12 +39,12 @@ public interface Producer<K,V> extends Closeable {
* @param record The record to send
* @return A future which will eventually contain the response information
*/
- public Future<RecordMetadata> send(ProducerRecord<K,V> record);
+ public Future<RecordMetadata> send(ProducerRecord<K, V> record);
/**
* Send a record and invoke the given callback when the record has been acknowledged by the server
*/
- public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback);
+ public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
/**
* Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 9a43d66..122375c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -35,7 +35,7 @@ public class ProducerConfig extends AbstractConfig {
* CHANGE WILL BREAK USER CODE.
*/
- private static final ConfigDef config;
+ private static final ConfigDef CONFIG;
/** <code>bootstrap.servers</code> */
public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
@@ -167,13 +167,13 @@ public class ProducerConfig extends AbstractConfig {
private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface.";
static {
- config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
+ CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
.define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
.define(ACKS_CONFIG,
Type.STRING,
"1",
- in("all","-1", "0", "1"),
+ in("all", "-1", "0", "1"),
Importance.HIGH,
ACKS_DOC)
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
@@ -218,11 +218,11 @@ public class ProducerConfig extends AbstractConfig {
}
ProducerConfig(Map<? extends Object, ? extends Object> props) {
- super(config, props);
+ super(CONFIG, props);
}
public static void main(String[] args) {
- System.out.println(config.toHtmlTable());
+ System.out.println(CONFIG.toHtmlTable());
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index 8d4156d..4cb1e50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -80,11 +80,11 @@ public final class BufferPool {
this.time = time;
this.waitTime = this.metrics.sensor("bufferpool-wait-time");
MetricName metricName = new MetricName("bufferpool-wait-ratio",
- metricGrpName,
- "The fraction of time an appender waits for space allocation.",
- metricTags);
+ metricGrpName,
+ "The fraction of time an appender waits for space allocation.",
+ metricTags);
this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
- }
+ }
/**
* Allocate a buffer of the given size. This method blocks if there is not enough memory and the buffer pool
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
deleted file mode 100644
index 3aff624..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.producer.internals;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A class encapsulating some of the logic around metadata.
- * <p>
- * This class is shared by the client thread (for partitioning) and the background sender thread.
- *
- * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a
- * topic we don't have any metadata for it will trigger a metadata update.
- */
-public final class Metadata {
-
- private static final Logger log = LoggerFactory.getLogger(Metadata.class);
-
- private final long refreshBackoffMs;
- private final long metadataExpireMs;
- private int version;
- private long lastRefreshMs;
- private Cluster cluster;
- private boolean needUpdate;
- private final Set<String> topics;
-
- /**
- * Create a metadata instance with reasonable defaults
- */
- public Metadata() {
- this(100L, 60 * 60 * 1000L);
- }
-
- /**
- * Create a new Metadata instance
- * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
- * polling
- * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
- */
- public Metadata(long refreshBackoffMs, long metadataExpireMs) {
- this.refreshBackoffMs = refreshBackoffMs;
- this.metadataExpireMs = metadataExpireMs;
- this.lastRefreshMs = 0L;
- this.version = 0;
- this.cluster = Cluster.empty();
- this.needUpdate = false;
- this.topics = new HashSet<String>();
- }
-
- /**
- * Get the current cluster info without blocking
- */
- public synchronized Cluster fetch() {
- return this.cluster;
- }
-
- /**
- * Add the topic to maintain in the metadata
- */
- public synchronized void add(String topic) {
- topics.add(topic);
- }
-
- /**
- * The next time to update the cluster info is the maximum of the time the current info will expire and the time the
- * current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time
- * is now
- */
- public synchronized long timeToNextUpdate(long nowMs) {
- long timeToExpire = needUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0);
- long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
- return Math.max(timeToExpire, timeToAllowUpdate);
- }
-
- /**
- * Request an update of the current cluster metadata info, return the current version before the update
- */
- public synchronized int requestUpdate() {
- this.needUpdate = true;
- return this.version;
- }
-
- /**
- * Wait for metadata update until the current version is larger than the last version we know of
- */
- public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) {
- if (maxWaitMs < 0) {
- throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
- }
- long begin = System.currentTimeMillis();
- long remainingWaitMs = maxWaitMs;
- while (this.version <= lastVersion) {
- try {
- if (remainingWaitMs != 0) {
- wait(remainingWaitMs);
- }
- } catch (InterruptedException e) { /* this is fine */
- }
- long elapsed = System.currentTimeMillis() - begin;
- if (elapsed >= maxWaitMs)
- throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
- remainingWaitMs = maxWaitMs - elapsed;
- }
- }
-
- /**
- * Add one or more topics to maintain metadata for
- */
- public synchronized void addTopics(String... topics) {
- for (String topic : topics)
- this.topics.add(topic);
- requestUpdate();
- }
-
- /**
- * Get the list of topics we are currently maintaining metadata for
- */
- public synchronized Set<String> topics() {
- return new HashSet<String>(this.topics);
- }
-
- /**
- * Update the cluster metadata
- */
- public synchronized void update(Cluster cluster, long now) {
- this.needUpdate = false;
- this.lastRefreshMs = now;
- this.version += 1;
- this.cluster = cluster;
- notifyAll();
- log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
- }
-
- /**
- * @return The current metadata version
- */
- public synchronized int version() {
- return this.version;
- }
-
- /**
- * The last time metadata was updated.
- */
- public synchronized long lastUpdate() {
- return this.lastRefreshMs;
- }
-
- /**
- * The metadata refresh backoff in ms
- */
- public long refreshBackoff() {
- return refreshBackoffMs;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
index b70ece7..8e5855d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
@@ -19,7 +19,6 @@ package org.apache.kafka.clients.producer.internals;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 50889e4..ecfe214 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -102,26 +102,27 @@ public final class RecordAccumulator {
private void registerMetrics(Metrics metrics, String metricGrpName, Map<String, String> metricTags) {
MetricName metricName = new MetricName("waiting-threads", metricGrpName, "The number of user threads blocked waiting for buffer memory to enqueue their records", metricTags);
- metrics.addMetric(metricName,
- new Measurable() {
- public double measure(MetricConfig config, long now) {
- return free.queued();
- }
- });
+ Measurable waitingThreads = new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return free.queued();
+ }
+ };
+ metrics.addMetric(metricName, waitingThreads);
+
metricName = new MetricName("buffer-total-bytes", metricGrpName, "The maximum amount of buffer memory the client can use (whether or not it is currently used).", metricTags);
- metrics.addMetric(metricName,
- new Measurable() {
- public double measure(MetricConfig config, long now) {
- return free.totalMemory();
- }
- });
+ Measurable totalBytes = new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return free.totalMemory();
+ }
+ };
+ metrics.addMetric(metricName, totalBytes);
metricName = new MetricName("buffer-available-bytes", metricGrpName, "The total amount of buffer memory that is not being used (either unallocated or in the free list).", metricTags);
- metrics.addMetric(metricName,
- new Measurable() {
- public double measure(MetricConfig config, long now) {
- return free.availableMemory();
- }
- });
+ Measurable availableBytes = new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return free.availableMemory();
+ }
+ };
+ metrics.addMetric(metricName, availableBytes);
}
/**
@@ -228,8 +229,7 @@ public final class RecordAccumulator {
boolean sendable = full || expired || exhausted || closed;
if (sendable && !backingOff) {
readyNodes.add(leader);
- }
- else {
+ } else {
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 8726809..ed9c63a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
index 689bae9..13f4d59 100644
--- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
+++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
@@ -45,12 +45,12 @@ public class ProducerPerformance {
}
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[],byte[]>(props);
+ KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props);
/* setup perf test */
byte[] payload = new byte[recordSize];
Arrays.fill(payload, (byte) 1);
- ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>(topicName, payload);
+ ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicName, payload);
long sleepTime = NS_PER_SEC / throughput;
long sleepDeficitNs = 0;
Stats stats = new Stats(numRecords, 5000);
@@ -66,8 +66,8 @@ public class ProducerPerformance {
* and then make up the whole deficit in one longer sleep.
*/
if (throughput > 0) {
- float elapsed = (sendStart - start)/1000.f;
- if (elapsed > 0 && i/elapsed > throughput) {
+ float elapsed = (sendStart - start) / 1000.f;
+ if (elapsed > 0 && i / elapsed > throughput) {
sleepDeficitNs += sleepTime;
if (sleepDeficitNs >= MIN_SLEEP_NS) {
long sleepMs = sleepDeficitNs / 1000000;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index d7ccbcd..8fcd291 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -40,8 +40,8 @@ public final class Cluster {
this.nodes = Collections.unmodifiableList(copy);
this.nodesById = new HashMap<Integer, Node>();
- for(Node node: nodes)
- this.nodesById.put(node.id(), node);
+ for (Node node: nodes)
+ this.nodesById.put(node.id(), node);
// index the partitions by topic/partition for quick lookup
this.partitionsByTopicPartition = new HashMap<TopicPartition, PartitionInfo>(partitions.size());
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/MetricName.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/MetricName.java b/clients/src/main/java/org/apache/kafka/common/MetricName.java
index 7e977e9..04b4a09 100644
--- a/clients/src/main/java/org/apache/kafka/common/MetricName.java
+++ b/clients/src/main/java/org/apache/kafka/common/MetricName.java
@@ -90,8 +90,8 @@ public final class MetricName {
throw new IllegalArgumentException("keyValue needs to be specified in paris");
Map<String, String> tags = new HashMap<String, String>();
- for (int i=0; i<(keyValue.length / 2); i++)
- tags.put(keyValue[i], keyValue[i+1]);
+ for (int i = 0; i < keyValue.length / 2; i++)
+ tags.put(keyValue[i], keyValue[i + 1]);
return tags;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
index 28562f9..321da8a 100644
--- a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
+++ b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
@@ -72,7 +72,7 @@ public class PartitionInfo {
return String.format("Partition(topic = %s, partition = %d, leader = %s, replicas = %s, isr = %s",
topic,
partition,
- leader == null? "none" : leader.id(),
+ leader == null ? "none" : leader.id(),
fmtNodeIds(replicas),
fmtNodeIds(inSyncReplicas));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 38ce10b..8523333 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -53,6 +53,7 @@ public class ConfigDef {
/**
* Returns unmodifiable set of properties names defined in this {@linkplain ConfigDef}
+ *
* @return new unmodifiable {@link Set} instance containing the keys
*/
public Set<String> names() {
@@ -61,6 +62,7 @@ public class ConfigDef {
/**
* Define a new configuration
+ *
* @param name The name of the config parameter
* @param type The type of the config
* @param defaultValue The default value to use if this config isn't present
@@ -69,16 +71,23 @@ public class ConfigDef {
* @param documentation The documentation string for the config
* @return This ConfigDef so you can chain calls
*/
- public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) {
+ public ConfigDef define(String name,
+ Type type,
+ Object defaultValue,
+ Validator validator,
+ Importance importance,
+ String documentation) {
if (configKeys.containsKey(name))
throw new ConfigException("Configuration " + name + " is defined twice.");
- Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
+ Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE
+ : parseType(name, defaultValue, type);
configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation));
return this;
}
/**
* Define a new configuration with no special validation logic
+ *
* @param name The name of the config parameter
* @param type The type of the config
* @param defaultValue The default value to use if this config isn't present
@@ -92,6 +101,7 @@ public class ConfigDef {
/**
* Define a required parameter with no default value
+ *
* @param name The name of the config parameter
* @param type The type of the config
* @param validator A validator to use in checking the correctness of the config
@@ -105,6 +115,7 @@ public class ConfigDef {
/**
* Define a required parameter with no default value and no special validation logic
+ *
* @param name The name of the config parameter
* @param type The type of the config
* @param importance The importance of this config: is this something you will likely need to change.
@@ -120,6 +131,7 @@ public class ConfigDef {
* that the keys of the map are strings, but the values can either be strings or they may already be of the
* appropriate type (int, string, etc). This will work equally well with either java.util.Properties instances or a
* programmatically constructed map.
+ *
* @param props The configs to parse and validate
* @return Parsed and validated configs. The key will be the config name and the value will be the value parsed into
* the appropriate type (int, string, etc)
@@ -132,7 +144,8 @@ public class ConfigDef {
if (props.containsKey(key.name))
value = parseType(key.name, props.get(key.name), key.type);
else if (key.defaultValue == NO_DEFAULT_VALUE)
- throw new ConfigException("Missing required configuration \"" + key.name + "\" which has no default value.");
+ throw new ConfigException("Missing required configuration \"" + key.name
+ + "\" which has no default value.");
else
value = key.defaultValue;
if (key.validator != null)
@@ -144,6 +157,7 @@ public class ConfigDef {
/**
* Parse a value according to its expected type.
+ *
* @param name The config name
* @param value The config value
* @param type The expected type
@@ -157,14 +171,13 @@ public class ConfigDef {
switch (type) {
case BOOLEAN:
if (value instanceof String) {
- if (trimmed.equalsIgnoreCase("true"))
- return true;
- else if (trimmed.equalsIgnoreCase("false"))
- return false;
- else
- throw new ConfigException(name, value, "Expected value to be either true or false");
- }
- else if (value instanceof Boolean)
+ if (trimmed.equalsIgnoreCase("true"))
+ return true;
+ else if (trimmed.equalsIgnoreCase("false"))
+ return false;
+ else
+ throw new ConfigException(name, value, "Expected value to be either true or false");
+ } else if (value instanceof Boolean)
return value;
else
throw new ConfigException(name, value, "Expected value to be either true or false");
@@ -172,7 +185,8 @@ public class ConfigDef {
if (value instanceof String)
return trimmed;
else
- throw new ConfigException(name, value, "Expected value to be a string, but it was a " + value.getClass().getName());
+ throw new ConfigException(name, value, "Expected value to be a string, but it was a "
+ + value.getClass().getName());
case INT:
if (value instanceof Integer) {
return (Integer) value;
@@ -256,6 +270,7 @@ public class ConfigDef {
/**
* A numeric range that checks only the lower bound
+ *
* @param min The minimum acceptable value
*/
public static Range atLeast(Number min) {
@@ -287,32 +302,30 @@ public class ConfigDef {
}
}
- public static class ValidString implements Validator {
- List<String> validStrings;
+ public static class ValidString implements Validator {
+ private final List<String> validStrings;
- private ValidString(List<String> validStrings) {
- this.validStrings = validStrings;
- }
+ private ValidString(List<String> validStrings) {
+ this.validStrings = validStrings;
+ }
- public static ValidString in(String... validStrings) {
- return new ValidString(Arrays.asList(validStrings));
- }
+ public static ValidString in(String... validStrings) {
+ return new ValidString(Arrays.asList(validStrings));
+ }
- @Override
- public void ensureValid(String name, Object o) {
- String s = (String) o;
- if (!validStrings.contains(s)) {
- throw new ConfigException(name,o,"String must be one of: " + Utils.join(validStrings, ", "));
- }
+ @Override
+ public void ensureValid(String name, Object o) {
+ String s = (String) o;
+ if (!validStrings.contains(s))
+ throw new ConfigException(name, o, "String must be one of: " + Utils.join(validStrings, ", "));
+ }
- }
+ public String toString() {
+ return "[" + Utils.join(validStrings, ", ") + "]";
+ }
- public String toString() {
- return "[" + Utils.join(validStrings, ", ") + "]";
}
- }
-
private static class ConfigKey {
public final String name;
public final Type type;
@@ -321,7 +334,12 @@ public class ConfigDef {
public final Validator validator;
public final Importance importance;
- public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) {
+ public ConfigKey(String name,
+ Type type,
+ Object defaultValue,
+ Validator validator,
+ Importance importance,
+ String documentation) {
super();
this.name = name;
this.type = type;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
index 75c80a9..a6107b8 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -17,27 +17,26 @@
package org.apache.kafka.common.errors;
/**
- * Number of insync replicas for the partition is lower than min.insync.replicas
- * This exception is raised when the low ISR size is discovered *after* the message
- * was already appended to the log. Producer retries will cause duplicates.
+ * Number of insync replicas for the partition is lower than min.insync.replicas This exception is raised when the low
+ * ISR size is discovered *after* the message was already appended to the log. Producer retries will cause duplicates.
*/
public class NotEnoughReplicasAfterAppendException extends RetriableException {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- public NotEnoughReplicasAfterAppendException() {
- super();
- }
+ public NotEnoughReplicasAfterAppendException() {
+ super();
+ }
- public NotEnoughReplicasAfterAppendException(String message, Throwable cause) {
- super(message,cause);
- }
+ public NotEnoughReplicasAfterAppendException(String message, Throwable cause) {
+ super(message, cause);
+ }
- public NotEnoughReplicasAfterAppendException(String message) {
- super(message);
- }
+ public NotEnoughReplicasAfterAppendException(String message) {
+ super(message);
+ }
- public NotEnoughReplicasAfterAppendException(Throwable cause) {
- super(cause);
- }
+ public NotEnoughReplicasAfterAppendException(Throwable cause) {
+ super(cause);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
index 486d515..1573227 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -20,21 +20,21 @@ package org.apache.kafka.common.errors;
* Number of insync replicas for the partition is lower than min.insync.replicas
*/
public class NotEnoughReplicasException extends RetriableException {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- public NotEnoughReplicasException() {
- super();
- }
+ public NotEnoughReplicasException() {
+ super();
+ }
- public NotEnoughReplicasException(String message, Throwable cause) {
- super(message, cause);
- }
+ public NotEnoughReplicasException(String message, Throwable cause) {
+ super(message, cause);
+ }
- public NotEnoughReplicasException(String message) {
- super(message);
- }
+ public NotEnoughReplicasException(String message) {
+ super(message);
+ }
- public NotEnoughReplicasException(Throwable cause) {
- super(cause);
- }
+ public NotEnoughReplicasException(Throwable cause) {
+ super(cause);
+ }
}