You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2015/02/04 05:58:40 UTC
[1/6] kafka git commit: KAFKA-1915: Add checkstyle for java code.
Repository: kafka
Updated Branches:
refs/heads/trunk f1ba4ff87 -> 1c6d5bbac
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 8a305b0..69530c1 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -29,7 +29,7 @@ public class Utils {
private static final Pattern HOST_PORT_PATTERN = Pattern.compile("\\[?(.+?)\\]?:(\\d+)");
- public static String NL = System.getProperty("line.separator");
+ public static final String NL = System.getProperty("line.separator");
/**
* Turn the given UTF8 byte array into a string
@@ -87,10 +87,10 @@ public class Utils {
* @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
*/
public static int readUnsignedIntLE(InputStream in) throws IOException {
- return (in.read() << 8*0)
- | (in.read() << 8*1)
- | (in.read() << 8*2)
- | (in.read() << 8*3);
+ return (in.read() << 8 * 0)
+ | (in.read() << 8 * 1)
+ | (in.read() << 8 * 2)
+ | (in.read() << 8 * 3);
}
/**
@@ -102,10 +102,10 @@ public class Utils {
* @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
*/
public static int readUnsignedIntLE(byte[] buffer, int offset) {
- return (buffer[offset++] << 8*0)
- | (buffer[offset++] << 8*1)
- | (buffer[offset++] << 8*2)
- | (buffer[offset] << 8*3);
+ return (buffer[offset++] << 8 * 0)
+ | (buffer[offset++] << 8 * 1)
+ | (buffer[offset++] << 8 * 2)
+ | (buffer[offset] << 8 * 3);
}
/**
@@ -136,10 +136,10 @@ public class Utils {
* @param value The value to write
*/
public static void writeUnsignedIntLE(OutputStream out, int value) throws IOException {
- out.write(value >>> 8*0);
- out.write(value >>> 8*1);
- out.write(value >>> 8*2);
- out.write(value >>> 8*3);
+ out.write(value >>> 8 * 0);
+ out.write(value >>> 8 * 1);
+ out.write(value >>> 8 * 2);
+ out.write(value >>> 8 * 3);
}
/**
@@ -151,10 +151,10 @@ public class Utils {
* @param value The value to write
*/
public static void writeUnsignedIntLE(byte[] buffer, int offset, int value) {
- buffer[offset++] = (byte) (value >>> 8*0);
- buffer[offset++] = (byte) (value >>> 8*1);
- buffer[offset++] = (byte) (value >>> 8*2);
- buffer[offset] = (byte) (value >>> 8*3);
+ buffer[offset++] = (byte) (value >>> 8 * 0);
+ buffer[offset++] = (byte) (value >>> 8 * 1);
+ buffer[offset++] = (byte) (value >>> 8 * 2);
+ buffer[offset] = (byte) (value >>> 8 * 3);
}
@@ -285,7 +285,7 @@ public class Utils {
case 2:
h ^= (data[(length & ~3) + 1] & 0xff) << 8;
case 1:
- h ^= (data[length & ~3] & 0xff);
+ h ^= data[length & ~3] & 0xff;
h *= m;
}
@@ -348,11 +348,11 @@ public class Utils {
public static <T> String join(Collection<T> list, String seperator) {
StringBuilder sb = new StringBuilder();
Iterator<T> iter = list.iterator();
- while(iter.hasNext()) {
+ while (iter.hasNext()) {
sb.append(iter.next());
- if(iter.hasNext())
- sb.append(seperator);
+ if (iter.hasNext())
+ sb.append(seperator);
}
- return sb.toString();
+ return sb.toString();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
new file mode 100644
index 0000000..13ce519
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+public class ClientUtilsTest {
+
+ @Test
+ public void testParseAndValidateAddresses() {
+ check("127.0.0.1:8000");
+ check("mydomain.com:8080");
+ check("[::1]:8000");
+ check("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", "mydomain.com:10000");
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testNoPort() {
+ check("127.0.0.1");
+ }
+
+ private void check(String... url) {
+ ClientUtils.parseAndValidateAddresses(Arrays.asList(url));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 67bee40..8f1a7a6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.kafka.clients;
import java.util.ArrayDeque;
@@ -65,7 +81,7 @@ public class MockClient implements KafkaClient {
@Override
public List<ClientResponse> poll(long timeoutMs, long now) {
- for(ClientResponse response: this.responses)
+ for (ClientResponse response: this.responses)
if (response.request().hasCallback())
response.request().callback().onComplete(response);
List<ClientResponse> copy = new ArrayList<ClientResponse>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 5debcd6..8b27889 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.kafka.clients;
import static org.junit.Assert.assertEquals;
@@ -9,7 +25,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import org.apache.kafka.clients.producer.internals.Metadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index e51d2df..677edd3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.kafka.clients.consumer;
import static org.junit.Assert.*;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 864f1c7..090087a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.kafka.clients.consumer.internals;
import static org.junit.Assert.*;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
index 77b23e7..4ae43ed 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
@@ -107,7 +107,7 @@ public class BufferPoolTest {
private CountDownLatch asyncDeallocate(final BufferPool pool, final ByteBuffer buffer) {
final CountDownLatch latch = new CountDownLatch(1);
- new Thread() {
+ Thread thread = new Thread() {
public void run() {
try {
latch.await();
@@ -116,13 +116,14 @@ public class BufferPoolTest {
}
pool.deallocate(buffer);
}
- }.start();
+ };
+ thread.start();
return latch;
}
private CountDownLatch asyncAllocate(final BufferPool pool, final int size) {
final CountDownLatch completed = new CountDownLatch(1);
- new Thread() {
+ Thread thread = new Thread() {
public void run() {
try {
pool.allocate(size);
@@ -132,7 +133,8 @@ public class BufferPoolTest {
completed.countDown();
}
}
- }.start();
+ };
+ thread.start();
return completed;
}
@@ -172,12 +174,12 @@ public class BufferPoolTest {
try {
for (int i = 0; i < iterations; i++) {
int size;
- if (TestUtils.random.nextBoolean())
+ if (TestUtils.RANDOM.nextBoolean())
// allocate poolable size
size = pool.poolableSize();
else
// allocate a random size
- size = TestUtils.random.nextInt((int) pool.totalMemory());
+ size = TestUtils.RANDOM.nextInt((int) pool.totalMemory());
ByteBuffer buffer = pool.allocate(size);
pool.deallocate(buffer);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
index 74605c3..743aa7e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
@@ -12,7 +12,7 @@
*/
package org.apache.kafka.clients.producer;
-import org.apache.kafka.clients.producer.internals.Metadata;
+import org.apache.kafka.clients.Metadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.test.TestUtils;
@@ -49,7 +49,7 @@ public class MetadataTest {
}
/**
- * Tests that {@link org.apache.kafka.clients.producer.internals.Metadata#awaitUpdate(int, long)} doesn't
+ * Tests that {@link org.apache.kafka.clients.Metadata#awaitUpdate(int, long)} doesn't
* wait forever with a max timeout value of 0
*
* @throws Exception
@@ -68,9 +68,9 @@ public class MetadataTest {
// expected
}
// now try with a higher timeout value once
- final long TWO_SECOND_WAIT = 2000;
+ final long twoSecondWait = 2000;
try {
- metadata.awaitUpdate(metadata.requestUpdate(), TWO_SECOND_WAIT);
+ metadata.awaitUpdate(metadata.requestUpdate(), twoSecondWait);
fail("Wait on metadata update was expected to timeout, but it didn't");
} catch (TimeoutException te) {
// expected
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index d3377ef..75513b0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -32,6 +32,7 @@ public class MockProducerTest {
private String topic = "topic";
@Test
+ @SuppressWarnings("unchecked")
public void testAutoCompleteMock() throws Exception {
MockProducer producer = new MockProducer(true);
ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topic, "key".getBytes(), "value".getBytes());
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
index 82d8083..29c8417 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
@@ -31,7 +31,7 @@ public class PartitionerTest {
private Node node0 = new Node(0, "localhost", 99);
private Node node1 = new Node(1, "localhost", 100);
private Node node2 = new Node(2, "localhost", 101);
- private Node[] nodes = new Node[] { node0, node1, node2 };
+ private Node[] nodes = new Node[] {node0, node1, node2};
private String topic = "test";
private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 0, node0, nodes, nodes),
new PartitionInfo(topic, 1, node1, nodes, nodes),
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
index e2bb8da..8333863 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
@@ -128,6 +128,7 @@ public class RecordAccumulatorTest {
assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size());
}
+ @SuppressWarnings("unused")
@Test
public void testStressfulSituation() throws Exception {
final int numThreads = 5;
@@ -194,7 +195,7 @@ public class RecordAccumulatorTest {
assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs);
// Add data for another partition on node1, enough to make data sendable immediately
- for (int i = 0; i < appends+1; i++)
+ for (int i = 0; i < appends + 1; i++)
accum.append(tp2, key, value, CompressionType.NONE, null);
result = accum.ready(cluster, time.milliseconds());
assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
index a3700a6..1e5d1c2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
@@ -78,7 +78,7 @@ public class RecordSendTest {
/* create a new request result that will be completed after the given timeout */
public ProduceRequestResult asyncRequest(final long baseOffset, final RuntimeException error, final long timeout) {
final ProduceRequestResult request = new ProduceRequestResult();
- new Thread() {
+ Thread thread = new Thread() {
public void run() {
try {
sleep(timeout);
@@ -87,7 +87,8 @@ public class RecordSendTest {
e.printStackTrace();
}
}
- }.start();
+ };
+ thread.start();
return request;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
index 888b929..558942a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
@@ -21,8 +21,8 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
-import org.apache.kafka.clients.producer.internals.Metadata;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.common.Cluster;
@@ -147,8 +147,8 @@ public class SenderTest {
partResp.set("partition", part);
partResp.set("error_code", (short) error);
partResp.set("base_offset", offset);
- response.set("partition_responses", new Object[] { partResp });
- struct.set("responses", new Object[] { response });
+ response.set("partition_responses", new Object[] {partResp});
+ struct.set("responses", new Object[] {response});
return struct;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index 3cfd36d..66442ed 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -14,87 +14,67 @@ package org.apache.kafka.common.config;
import static org.junit.Assert.fail;
-import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.junit.Test;
public class AbstractConfigTest {
- @Test
- public void testConfiguredInstances() {
- testValidInputs("");
- testValidInputs("org.apache.kafka.common.config.AbstractConfigTest$TestMetricsReporter");
- testValidInputs("org.apache.kafka.common.config.AbstractConfigTest$TestMetricsReporter,org.apache.kafka.common.config.AbstractConfigTest$TestMetricsReporter");
- testInvalidInputs(",");
- testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
- testInvalidInputs("test1,test2");
- testInvalidInputs("org.apache.kafka.common.config.AbstractConfigTest$TestMetricsReporter,");
- }
-
- private void testValidInputs(String configValue) {
- Properties props = new Properties();
- props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
- TestConfig config = new TestConfig(props);
- try {
- config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG,
- MetricsReporter.class);
- } catch (ConfigException e) {
- fail("No exceptions are expected here, valid props are :" + props);
- }
- }
-
- private void testInvalidInputs(String configValue) {
- Properties props = new Properties();
- props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
- TestConfig config = new TestConfig(props);
- try {
- config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG,
- MetricsReporter.class);
- fail("Expected a config exception due to invalid props :" + props);
- } catch (ConfigException e) {
- // this is good
+ @Test
+ public void testConfiguredInstances() {
+ testValidInputs("");
+ testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter");
+ testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter, org.apache.kafka.common.metrics.FakeMetricsReporter");
+ testInvalidInputs(",");
+ testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
+ testInvalidInputs("test1,test2");
+ testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,");
}
- }
-
- private static class TestConfig extends AbstractConfig {
-
- private static final ConfigDef config;
-
- public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
- private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters.";
- static {
- config = new ConfigDef().define(METRIC_REPORTER_CLASSES_CONFIG,
- Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC);
+ private void testValidInputs(String configValue) {
+ Properties props = new Properties();
+ props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
+ TestConfig config = new TestConfig(props);
+ try {
+ config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
+ } catch (ConfigException e) {
+ fail("No exceptions are expected here, valid props are :" + props);
+ }
}
- public TestConfig(Map<? extends Object, ? extends Object> props) {
- super(config, props);
+ private void testInvalidInputs(String configValue) {
+ Properties props = new Properties();
+ props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
+ TestConfig config = new TestConfig(props);
+ try {
+ config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
+ fail("Expected a config exception due to invalid props :" + props);
+ } catch (ConfigException e) {
+ // this is good
+ }
}
- }
-
- public static class TestMetricsReporter implements MetricsReporter {
- @Override
- public void configure(Map<String, ?> configs) {
- }
+ private static class TestConfig extends AbstractConfig {
- @Override
- public void init(List<KafkaMetric> metrics) {
-}
+ private static final ConfigDef CONFIG;
- @Override
- public void metricChange(KafkaMetric metric) {
- }
+ public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
+ private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters.";
+
+ static {
+ CONFIG = new ConfigDef().define(METRIC_REPORTER_CLASSES_CONFIG,
+ Type.LIST,
+ "",
+ Importance.LOW,
+ METRIC_REPORTER_CLASSES_DOC);
+ }
- @Override
- public void close() {
+ public TestConfig(Map<? extends Object, ? extends Object> props) {
+ super(CONFIG, props);
+ }
}
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 16d3fed..44c2ef0 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -16,7 +16,6 @@ import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -110,7 +109,7 @@ public class ConfigDefTest {
@Test(expected = ConfigException.class)
public void testInvalidDefaultRange() {
- new ConfigDef().define("name", Type.INT, -1, Range.between(0,10), Importance.HIGH, "docs");
+ new ConfigDef().define("name", Type.INT, -1, Range.between(0, 10), Importance.HIGH, "docs");
}
@Test(expected = ConfigException.class)
@@ -120,7 +119,7 @@ public class ConfigDefTest {
@Test
public void testValidators() {
- testValidators(Type.INT, Range.between(0,10), 5, new Object[]{1, 5, 9}, new Object[]{-1, 11});
+ testValidators(Type.INT, Range.between(0, 10), 5, new Object[]{1, 5, 9}, new Object[]{-1, 11});
testValidators(Type.STRING, ValidString.in("good", "values", "default"), "default",
new Object[]{"good", "values", "default"}, new Object[]{"bad", "inputs"});
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java b/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java
new file mode 100644
index 0000000..7c7ead1
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import java.util.List;
+import java.util.Map;
+
+public class FakeMetricsReporter implements MetricsReporter {
+
+ @Override
+ public void configure(Map<String, ?> configs) {}
+
+ @Override
+ public void init(List<KafkaMetric> metrics) {}
+
+ @Override
+ public void metricChange(KafkaMetric metric) {}
+
+ @Override
+ public void close() {}
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 998a57c..544e120 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -36,7 +36,7 @@ import org.junit.Test;
public class MetricsTest {
- private static double EPS = 0.000001;
+ private static final double EPS = 0.000001;
MockTime time = new MockTime();
Metrics metrics = new Metrics(new MetricConfig(), Arrays.asList((MetricsReporter) new JmxReporter()), time);
@@ -71,7 +71,7 @@ public class MetricsTest {
s.add(new MetricName("test.count", "grp1"), new Count());
s.add(new Percentiles(100, -100, 100, BucketSizing.CONSTANT,
new Percentile(new MetricName("test.median", "grp1"), 50.0),
- new Percentile(new MetricName("test.perc99_9", "grp1"),99.9)));
+ new Percentile(new MetricName("test.perc99_9", "grp1"), 99.9)));
Sensor s2 = metrics.sensor("test.sensor2");
s2.add(new MetricName("s2.total", "grp1"), new Total());
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java
index 3be6b2d..a55cc32 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java
@@ -22,7 +22,6 @@ import java.util.Arrays;
import java.util.Random;
-import org.apache.kafka.common.metrics.stats.Histogram;
import org.apache.kafka.common.metrics.stats.Histogram.BinScheme;
import org.apache.kafka.common.metrics.stats.Histogram.ConstantBinScheme;
import org.apache.kafka.common.metrics.stats.Histogram.LinearBinScheme;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index a14659a..0d030bc 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -40,7 +40,6 @@ import org.junit.Test;
*/
public class SelectorTest {
- private static final List<NetworkSend> EMPTY = new ArrayList<NetworkSend>();
private static final int BUFFER_SIZE = 4 * 1024;
private EchoServer server;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
index 4480e9b..8b92634 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
@@ -23,12 +23,6 @@ import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
import java.util.Arrays;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.SchemaException;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.protocol.types.Type;
import org.junit.Before;
import org.junit.Test;
@@ -53,8 +47,8 @@ public class ProtocolSerializationTest {
.set("int64", (long) 1)
.set("string", "1")
.set("bytes", "1".getBytes())
- .set("array", new Object[] { 1 });
- this.struct.set("struct", this.struct.instance("struct").set("field", new Object[] { 1, 2, 3 }));
+ .set("array", new Object[] {1});
+ this.struct.set("struct", this.struct.instance("struct").set("field", new Object[] {1, 2, 3}));
}
@Test
@@ -68,9 +62,9 @@ public class ProtocolSerializationTest {
check(Type.STRING, "A\u00ea\u00f1\u00fcC");
check(Type.BYTES, ByteBuffer.allocate(0));
check(Type.BYTES, ByteBuffer.wrap("abcd".getBytes()));
- check(new ArrayOf(Type.INT32), new Object[] { 1, 2, 3, 4 });
+ check(new ArrayOf(Type.INT32), new Object[] {1, 2, 3, 4});
check(new ArrayOf(Type.STRING), new Object[] {});
- check(new ArrayOf(Type.STRING), new Object[] { "hello", "there", "beautiful" });
+ check(new ArrayOf(Type.STRING), new Object[] {"hello", "there", "beautiful"});
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 94a1112..e343327 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -71,7 +71,7 @@ public class MemoryRecordsTest {
public static Collection<Object[]> data() {
List<Object[]> values = new ArrayList<Object[]>();
for (CompressionType type: CompressionType.values())
- values.add(new Object[] { type });
+ values.add(new Object[] {type});
return values;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
index 2765913..957fc8f 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
@@ -63,7 +63,7 @@ public class RecordTest {
@Test
public void testChecksum() {
assertEquals(record.checksum(), record.computeChecksum());
- assertEquals(record.checksum(), record.computeChecksum(
+ assertEquals(record.checksum(), Record.computeChecksum(
this.key == null ? null : this.key.array(),
this.value == null ? null : this.value.array(),
this.compression, 0, -1));
@@ -102,7 +102,7 @@ public class RecordTest {
for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes(), payload))
for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes(), payload))
for (CompressionType compression : CompressionType.values())
- values.add(new Object[] { key, value, compression });
+ values.add(new Object[] {key, value, compression});
return values;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index df37fc6..13237fd 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -17,6 +17,7 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
import org.junit.Test;
import java.lang.reflect.Method;
@@ -31,7 +32,7 @@ import static org.junit.Assert.assertEquals;
public class RequestResponseTest {
@Test
- public void testSerialization() throws Exception{
+ public void testSerialization() throws Exception {
List<AbstractRequestResponse> requestList = Arrays.asList(
createRequestHeader(),
createResponseHeader(),
@@ -67,7 +68,7 @@ public class RequestResponseTest {
}
private AbstractRequestResponse createRequestHeader() {
- return new RequestHeader((short)10, (short)1, "", 10);
+ return new RequestHeader((short) 10, (short) 1, "", 10);
}
private AbstractRequestResponse createResponseHeader() {
@@ -79,7 +80,7 @@ public class RequestResponseTest {
}
private AbstractRequestResponse createConsumerMetadataResponse() {
- return new ConsumerMetadataResponse((short)1, new Node(10, "host1", 2014));
+ return new ConsumerMetadataResponse((short) 1, new Node(10, "host1", 2014));
}
private AbstractRequestResponse createFetchRequest() {
@@ -91,7 +92,7 @@ public class RequestResponseTest {
private AbstractRequestResponse createFetchResponse() {
Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
- responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData((short)0, 1000000, ByteBuffer.allocate(10)));
+ responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10)));
return new FetchResponse(responseData);
}
@@ -100,7 +101,7 @@ public class RequestResponseTest {
}
private AbstractRequestResponse createHeartBeatResponse() {
- return new HeartbeatResponse((short)0);
+ return new HeartbeatResponse(Errors.NONE.code());
}
private AbstractRequestResponse createJoinGroupRequest() {
@@ -108,7 +109,7 @@ public class RequestResponseTest {
}
private AbstractRequestResponse createJoinGroupResponse() {
- return new JoinGroupResponse((short)0, 1, "consumer1", Arrays.asList(new TopicPartition("test11", 1), new TopicPartition("test2", 1)));
+ return new JoinGroupResponse(Errors.NONE.code(), 1, "consumer1", Arrays.asList(new TopicPartition("test11", 1), new TopicPartition("test2", 1)));
}
private AbstractRequestResponse createListOffsetRequest() {
@@ -119,7 +120,7 @@ public class RequestResponseTest {
private AbstractRequestResponse createListOffsetResponse() {
Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
- responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData((short)0, Arrays.asList(100L)));
+ responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE.code(), Arrays.asList(100L)));
return new ListOffsetResponse(responseData);
}
@@ -145,7 +146,7 @@ public class RequestResponseTest {
private AbstractRequestResponse createOffsetCommitResponse() {
Map<TopicPartition, Short> responseData = new HashMap<TopicPartition, Short>();
- responseData.put(new TopicPartition("test", 0), (short)0);
+ responseData.put(new TopicPartition("test", 0), Errors.NONE.code());
return new OffsetCommitResponse(responseData);
}
@@ -155,19 +156,19 @@ public class RequestResponseTest {
private AbstractRequestResponse createOffsetFetchResponse() {
Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<TopicPartition, OffsetFetchResponse.PartitionData>();
- responseData.put(new TopicPartition("test", 0), new OffsetFetchResponse.PartitionData(100L, "", (short)0));
+ responseData.put(new TopicPartition("test", 0), new OffsetFetchResponse.PartitionData(100L, "", Errors.NONE.code()));
return new OffsetFetchResponse(responseData);
}
private AbstractRequestResponse createProduceRequest() {
Map<TopicPartition, ByteBuffer> produceData = new HashMap<TopicPartition, ByteBuffer>();
produceData.put(new TopicPartition("test", 0), ByteBuffer.allocate(10));
- return new ProduceRequest((short)0, 5000, produceData);
+ return new ProduceRequest(Errors.NONE.code(), 5000, produceData);
}
private AbstractRequestResponse createProduceResponse() {
Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
- responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse((short) 0, 10000));
+ responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000));
return new ProduceResponse(responseData);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index b6e1497..f5cd61c 100644
--- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -35,13 +35,13 @@ public class SerializationTest {
@Test
public void testStringSerializer() {
- String str = "my string";
+ String str = "my string";
String mytopic = "testTopic";
List<String> encodings = new ArrayList<String>();
encodings.add("UTF8");
encodings.add("UTF-16");
- for ( String encoding : encodings) {
+ for (String encoding : encodings) {
SerDeser<String> serDeser = getStringSerDeser(encoding);
Serializer<String> serializer = serDeser.serializer;
Deserializer<String> deserializer = serDeser.deserializer;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java
deleted file mode 100644
index 6e37ea5..0000000
--- a/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.utils;
-
-import org.apache.kafka.common.config.ConfigException;
-import org.junit.Test;
-
-import java.util.Arrays;
-
-public class ClientUtilsTest {
-
- @Test
- public void testParseAndValidateAddresses() {
- check("127.0.0.1:8000");
- check("mydomain.com:8080");
- check("[::1]:8000");
- check("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", "mydomain.com:10000");
- }
-
- @Test(expected = ConfigException.class)
- public void testNoPort() {
- check("127.0.0.1");
- }
-
- private void check(String... url) {
- ClientUtils.parseAndValidateAddresses(Arrays.asList(url));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java b/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java
index 6b32381..c39c3cf 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java
@@ -25,7 +25,7 @@ public class CrcTest {
@Test
public void testUpdate() {
- final byte bytes[] = "Any String you want".getBytes();
+ final byte[] bytes = "Any String you want".getBytes();
final int len = bytes.length;
Crc32 crc1 = new Crc32();
@@ -33,10 +33,10 @@ public class CrcTest {
Crc32 crc3 = new Crc32();
crc1.update(bytes, 0, len);
- for(int i = 0; i < len; i++)
+ for (int i = 0; i < len; i++)
crc2.update(bytes[i]);
- crc3.update(bytes, 0, len/2);
- crc3.update(bytes, len/2, len-len/2);
+ crc3.update(bytes, 0, len / 2);
+ crc3.update(bytes, len / 2, len - len / 2);
assertEquals("Crc values should be the same", crc1.getValue(), crc2.getValue());
assertEquals("Crc values should be the same", crc1.getValue(), crc3.getValue());
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
index b24d4de..8cd19b2 100644
--- a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
+++ b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
@@ -162,7 +162,6 @@ public class Microbenchmarks {
for (int i = 0; i < numThreads; i++) {
threads.add(new Thread() {
public void run() {
- int sum = 0;
long start = System.nanoTime();
for (int j = 0; j < iters; j++)
map.get(keys.get(j % threads.size()));
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 76a17e8..20dba7b 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -35,15 +35,15 @@ import org.apache.kafka.common.PartitionInfo;
*/
public class TestUtils {
- public static File IO_TMP_DIR = new File(System.getProperty("java.io.tmpdir"));
+ public static final File IO_TMP_DIR = new File(System.getProperty("java.io.tmpdir"));
- public static String LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
- public static String DIGITS = "0123456789";
- public static String LETTERS_AND_DIGITS = LETTERS + DIGITS;
+ public static final String LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+ public static final String DIGITS = "0123456789";
+ public static final String LETTERS_AND_DIGITS = LETTERS + DIGITS;
/* A consistent random number generator to make tests repeatable */
- public static final Random seededRandom = new Random(192348092834L);
- public static final Random random = new Random();
+ public static final Random SEEDED_RANDOM = new Random(192348092834L);
+ public static final Random RANDOM = new Random();
public static Cluster singletonCluster(String topic, int partitions) {
return clusterWith(1, topic, partitions);
@@ -92,7 +92,7 @@ public class TestUtils {
*/
public static byte[] randomBytes(int size) {
byte[] bytes = new byte[size];
- seededRandom.nextBytes(bytes);
+ SEEDED_RANDOM.nextBytes(bytes);
return bytes;
}
@@ -105,7 +105,7 @@ public class TestUtils {
public static String randomString(int len) {
StringBuilder b = new StringBuilder();
for (int i = 0; i < len; i++)
- b.append(LETTERS_AND_DIGITS.charAt(seededRandom.nextInt(LETTERS_AND_DIGITS.length())));
+ b.append(LETTERS_AND_DIGITS.charAt(SEEDED_RANDOM.nextInt(LETTERS_AND_DIGITS.length())));
return b.toString();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java
index facf509..7f45a90 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java
+++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java
@@ -17,9 +17,6 @@
package kafka.javaapi.consumer;
-import kafka.common.TopicAndPartition;
-import kafka.consumer.ConsumerThreadId;
-
import java.util.Map;
import java.util.Set;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/core/src/main/scala/kafka/message/CompressionFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/CompressionFactory.scala b/core/src/main/scala/kafka/message/CompressionFactory.scala
index c721040..b047f68 100644
--- a/core/src/main/scala/kafka/message/CompressionFactory.scala
+++ b/core/src/main/scala/kafka/message/CompressionFactory.scala
@@ -22,7 +22,7 @@ import java.util.zip.GZIPOutputStream
import java.util.zip.GZIPInputStream
import java.io.InputStream
-import org.apache.kafka.common.message.{KafkaLZ4BlockInputStream, KafkaLZ4BlockOutputStream}
+import org.apache.kafka.common.record.{KafkaLZ4BlockInputStream, KafkaLZ4BlockOutputStream}
object CompressionFactory {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
index 7909d25..026d819 100644
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@ -60,7 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
@SuppressWarnings({"unchecked", "rawtypes"})
public class KafkaMigrationTool
{
- private static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName());
+ private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName());
private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer";
private static final String KAFKA_07_CONSUMER_CONFIG_CLASS_NAME = "kafka.consumer.ConsumerConfig";
private static final String KAFKA_07_CONSUMER_STREAM_CLASS_NAME = "kafka.consumer.KafkaStream";
@@ -194,7 +194,7 @@ public class KafkaMigrationTool
kafkaConsumerProperties_07.load(new FileInputStream(consumerConfigFile_07));
/** Disable shallow iteration because the message format is different between 07 and 08, we have to get each individual message **/
if(kafkaConsumerProperties_07.getProperty("shallow.iterator.enable", "").equals("true")) {
- logger.warn("Shallow iterator should not be used in the migration tool");
+ log.warn("Shallow iterator should not be used in the migration tool");
kafkaConsumerProperties_07.setProperty("shallow.iterator.enable", "false");
}
Object consumerConfig_07 = ConsumerConfigConstructor_07.newInstance(kafkaConsumerProperties_07);
@@ -230,7 +230,7 @@ public class KafkaMigrationTool
try {
ConsumerConnectorShutdownMethod_07.invoke(consumerConnector_07);
} catch(Exception e) {
- logger.error("Error while shutting down Kafka consumer", e);
+ log.error("Error while shutting down Kafka consumer", e);
}
for(MigrationThread migrationThread : migrationThreads) {
migrationThread.shutdown();
@@ -241,7 +241,7 @@ public class KafkaMigrationTool
for(ProducerThread producerThread : producerThreads) {
producerThread.awaitShutdown();
}
- logger.info("Kafka migration tool shutdown successfully");
+ log.info("Kafka migration tool shutdown successfully");
}
});
@@ -266,7 +266,7 @@ public class KafkaMigrationTool
}
catch (Throwable e){
System.out.println("Kafka migration tool failed due to: " + Utils.stackTrace(e));
- logger.error("Kafka migration tool failed: ", e);
+ log.error("Kafka migration tool failed: ", e);
}
}
@@ -388,7 +388,7 @@ public class KafkaMigrationTool
KeyedMessage<byte[], byte[]> data = producerDataChannel.receiveRequest();
if(!data.equals(shutdownMessage)) {
producer.send(data);
- if(logger.isDebugEnabled()) logger.debug("Sending message %s".format(new String(data.message())));
+ if(logger.isDebugEnabled()) logger.debug(String.format("Sending message %s", new String(data.message())));
}
else
break;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/core/src/main/scala/kafka/utils/Crc32.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Crc32.java b/core/src/main/scala/kafka/utils/Crc32.java
index af9fe0d..0e0e7bc 100644
--- a/core/src/main/scala/kafka/utils/Crc32.java
+++ b/core/src/main/scala/kafka/utils/Crc32.java
@@ -62,16 +62,16 @@ public class Crc32 implements Checksum {
final int c1 =(b[off+1] ^ (localCrc >>>= 8)) & 0xff;
final int c2 =(b[off+2] ^ (localCrc >>>= 8)) & 0xff;
final int c3 =(b[off+3] ^ (localCrc >>>= 8)) & 0xff;
- localCrc = (T[T8_7_start + c0] ^ T[T8_6_start + c1])
- ^ (T[T8_5_start + c2] ^ T[T8_4_start + c3]);
+ localCrc = (T[T8_7_START + c0] ^ T[T8_6_START + c1])
+ ^ (T[T8_5_START + c2] ^ T[T8_4_START + c3]);
final int c4 = b[off+4] & 0xff;
final int c5 = b[off+5] & 0xff;
final int c6 = b[off+6] & 0xff;
final int c7 = b[off+7] & 0xff;
- localCrc ^= (T[T8_3_start + c4] ^ T[T8_2_start + c5])
- ^ (T[T8_1_start + c6] ^ T[T8_0_start + c7]);
+ localCrc ^= (T[T8_3_START + c4] ^ T[T8_2_START + c5])
+ ^ (T[T8_1_START + c6] ^ T[T8_0_START + c7]);
off += 8;
len -= 8;
@@ -79,13 +79,13 @@ public class Crc32 implements Checksum {
/* loop unroll - duff's device style */
switch(len) {
- case 7: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
- case 6: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
- case 5: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
- case 4: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
- case 3: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
- case 2: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
- case 1: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
+ case 7: localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)];
+ case 6: localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)];
+ case 5: localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)];
+ case 4: localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)];
+ case 3: localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)];
+ case 2: localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)];
+ case 1: localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)];
default:
/* nothing */
}
@@ -96,21 +96,21 @@ public class Crc32 implements Checksum {
@Override
final public void update(int b) {
- crc = (crc >>> 8) ^ T[T8_0_start + ((crc ^ b) & 0xff)];
+ crc = (crc >>> 8) ^ T[T8_0_START + ((crc ^ b) & 0xff)];
}
/*
* CRC-32 lookup tables generated by the polynomial 0xEDB88320.
* See also TestPureJavaCrc32.Table.
*/
- private static final int T8_0_start = 0*256;
- private static final int T8_1_start = 1*256;
- private static final int T8_2_start = 2*256;
- private static final int T8_3_start = 3*256;
- private static final int T8_4_start = 4*256;
- private static final int T8_5_start = 5*256;
- private static final int T8_6_start = 6*256;
- private static final int T8_7_start = 7*256;
+ private static final int T8_0_START = 0*256;
+ private static final int T8_1_START = 1*256;
+ private static final int T8_2_START = 2*256;
+ private static final int T8_3_START = 3*256;
+ private static final int T8_4_START = 4*256;
+ private static final int T8_5_START = 5*256;
+ private static final int T8_6_START = 6*256;
+ private static final int T8_7_START = 7*256;
private static final int[] T = new int[] {
/* T8_0 */
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
index c79192c..0d66fe5 100644
--- a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
+++ b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
@@ -22,7 +22,7 @@ import kafka.javaapi.FetchResponse;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
@@ -71,10 +71,9 @@ public class SimpleConsumerDemo {
printMessages((ByteBufferMessageSet) fetchResponse.messageSet(KafkaProperties.topic2, 0));
System.out.println("Testing single multi-fetch");
- Map<String, List<Integer>> topicMap = new HashMap<String, List<Integer>>() {{
- put(KafkaProperties.topic2, new ArrayList<Integer>(){{ add(0); }});
- put(KafkaProperties.topic3, new ArrayList<Integer>(){{ add(0); }});
- }};
+ Map<String, List<Integer>> topicMap = new HashMap<String, List<Integer>>();
+ topicMap.put(KafkaProperties.topic2, Collections.singletonList(0));
+ topicMap.put(KafkaProperties.topic3, Collections.singletonList(0));
req = new FetchRequestBuilder()
.clientId(KafkaProperties.clientId)
.addFetch(KafkaProperties.topic2, 0, 0L, 100)
[2/6] kafka git commit: KAFKA-1915: Add checkstyle for java code.
Posted by jk...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java b/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
index 047ca98..5b86700 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
@@ -30,7 +30,7 @@ public class Crc32 implements Checksum {
/**
* Compute the CRC32 of the byte array
- *
+ *
* @param bytes The array to compute the checksum for
* @return The CRC32
*/
@@ -40,7 +40,7 @@ public class Crc32 implements Checksum {
/**
* Compute the CRC32 of the segment of the byte array given by the specified size and offset
- *
+ *
* @param bytes The bytes to checksum
* @param offset the offset at which to begin checksumming
* @param size the number of bytes to checksum
@@ -79,14 +79,14 @@ public class Crc32 implements Checksum {
final int c1 = (b[off + 1] ^ (localCrc >>>= 8)) & 0xff;
final int c2 = (b[off + 2] ^ (localCrc >>>= 8)) & 0xff;
final int c3 = (b[off + 3] ^ (localCrc >>>= 8)) & 0xff;
- localCrc = (T[T8_7_start + c0] ^ T[T8_6_start + c1]) ^ (T[T8_5_start + c2] ^ T[T8_4_start + c3]);
+ localCrc = (T[T8_7_START + c0] ^ T[T8_6_START + c1]) ^ (T[T8_5_START + c2] ^ T[T8_4_START + c3]);
final int c4 = b[off + 4] & 0xff;
final int c5 = b[off + 5] & 0xff;
final int c6 = b[off + 6] & 0xff;
final int c7 = b[off + 7] & 0xff;
- localCrc ^= (T[T8_3_start + c4] ^ T[T8_2_start + c5]) ^ (T[T8_1_start + c6] ^ T[T8_0_start + c7]);
+ localCrc ^= (T[T8_3_START + c4] ^ T[T8_2_START + c5]) ^ (T[T8_1_START + c6] ^ T[T8_0_START + c7]);
off += 8;
len -= 8;
@@ -95,19 +95,19 @@ public class Crc32 implements Checksum {
/* loop unroll - duff's device style */
switch (len) {
case 7:
- localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
+ localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)];
case 6:
- localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
+ localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)];
case 5:
- localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
+ localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)];
case 4:
- localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
+ localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)];
case 3:
- localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
+ localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)];
case 2:
- localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
+ localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)];
case 1:
- localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
+ localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)];
default:
/* nothing */
}
@@ -118,7 +118,7 @@ public class Crc32 implements Checksum {
@Override
final public void update(int b) {
- crc = (crc >>> 8) ^ T[T8_0_start + ((crc ^ b) & 0xff)];
+ crc = (crc >>> 8) ^ T[T8_0_START + ((crc ^ b) & 0xff)];
}
/**
@@ -131,2075 +131,257 @@ public class Crc32 implements Checksum {
update((byte) input /* >> 0 */);
}
-
-
/*
* CRC-32 lookup tables generated by the polynomial 0xEDB88320. See also TestPureJavaCrc32.Table.
*/
- private static final int T8_0_start = 0 * 256;
- private static final int T8_1_start = 1 * 256;
- private static final int T8_2_start = 2 * 256;
- private static final int T8_3_start = 3 * 256;
- private static final int T8_4_start = 4 * 256;
- private static final int T8_5_start = 5 * 256;
- private static final int T8_6_start = 6 * 256;
- private static final int T8_7_start = 7 * 256;
+ private static final int T8_0_START = 0 * 256;
+ private static final int T8_1_START = 1 * 256;
+ private static final int T8_2_START = 2 * 256;
+ private static final int T8_3_START = 3 * 256;
+ private static final int T8_4_START = 4 * 256;
+ private static final int T8_5_START = 5 * 256;
+ private static final int T8_6_START = 6 * 256;
+ private static final int T8_7_START = 7 * 256;
private static final int[] T = new int[] {
- /* T8_0 */
- 0x00000000,
- 0x77073096,
- 0xEE0E612C,
- 0x990951BA,
- 0x076DC419,
- 0x706AF48F,
- 0xE963A535,
- 0x9E6495A3,
- 0x0EDB8832,
- 0x79DCB8A4,
- 0xE0D5E91E,
- 0x97D2D988,
- 0x09B64C2B,
- 0x7EB17CBD,
- 0xE7B82D07,
- 0x90BF1D91,
- 0x1DB71064,
- 0x6AB020F2,
- 0xF3B97148,
- 0x84BE41DE,
- 0x1ADAD47D,
- 0x6DDDE4EB,
- 0xF4D4B551,
- 0x83D385C7,
- 0x136C9856,
- 0x646BA8C0,
- 0xFD62F97A,
- 0x8A65C9EC,
- 0x14015C4F,
- 0x63066CD9,
- 0xFA0F3D63,
- 0x8D080DF5,
- 0x3B6E20C8,
- 0x4C69105E,
- 0xD56041E4,
- 0xA2677172,
- 0x3C03E4D1,
- 0x4B04D447,
- 0xD20D85FD,
- 0xA50AB56B,
- 0x35B5A8FA,
- 0x42B2986C,
- 0xDBBBC9D6,
- 0xACBCF940,
- 0x32D86CE3,
- 0x45DF5C75,
- 0xDCD60DCF,
- 0xABD13D59,
- 0x26D930AC,
- 0x51DE003A,
- 0xC8D75180,
- 0xBFD06116,
- 0x21B4F4B5,
- 0x56B3C423,
- 0xCFBA9599,
- 0xB8BDA50F,
- 0x2802B89E,
- 0x5F058808,
- 0xC60CD9B2,
- 0xB10BE924,
- 0x2F6F7C87,
- 0x58684C11,
- 0xC1611DAB,
- 0xB6662D3D,
- 0x76DC4190,
- 0x01DB7106,
- 0x98D220BC,
- 0xEFD5102A,
- 0x71B18589,
- 0x06B6B51F,
- 0x9FBFE4A5,
- 0xE8B8D433,
- 0x7807C9A2,
- 0x0F00F934,
- 0x9609A88E,
- 0xE10E9818,
- 0x7F6A0DBB,
- 0x086D3D2D,
- 0x91646C97,
- 0xE6635C01,
- 0x6B6B51F4,
- 0x1C6C6162,
- 0x856530D8,
- 0xF262004E,
- 0x6C0695ED,
- 0x1B01A57B,
- 0x8208F4C1,
- 0xF50FC457,
- 0x65B0D9C6,
- 0x12B7E950,
- 0x8BBEB8EA,
- 0xFCB9887C,
- 0x62DD1DDF,
- 0x15DA2D49,
- 0x8CD37CF3,
- 0xFBD44C65,
- 0x4DB26158,
- 0x3AB551CE,
- 0xA3BC0074,
- 0xD4BB30E2,
- 0x4ADFA541,
- 0x3DD895D7,
- 0xA4D1C46D,
- 0xD3D6F4FB,
- 0x4369E96A,
- 0x346ED9FC,
- 0xAD678846,
- 0xDA60B8D0,
- 0x44042D73,
- 0x33031DE5,
- 0xAA0A4C5F,
- 0xDD0D7CC9,
- 0x5005713C,
- 0x270241AA,
- 0xBE0B1010,
- 0xC90C2086,
- 0x5768B525,
- 0x206F85B3,
- 0xB966D409,
- 0xCE61E49F,
- 0x5EDEF90E,
- 0x29D9C998,
- 0xB0D09822,
- 0xC7D7A8B4,
- 0x59B33D17,
- 0x2EB40D81,
- 0xB7BD5C3B,
- 0xC0BA6CAD,
- 0xEDB88320,
- 0x9ABFB3B6,
- 0x03B6E20C,
- 0x74B1D29A,
- 0xEAD54739,
- 0x9DD277AF,
- 0x04DB2615,
- 0x73DC1683,
- 0xE3630B12,
- 0x94643B84,
- 0x0D6D6A3E,
- 0x7A6A5AA8,
- 0xE40ECF0B,
- 0x9309FF9D,
- 0x0A00AE27,
- 0x7D079EB1,
- 0xF00F9344,
- 0x8708A3D2,
- 0x1E01F268,
- 0x6906C2FE,
- 0xF762575D,
- 0x806567CB,
- 0x196C3671,
- 0x6E6B06E7,
- 0xFED41B76,
- 0x89D32BE0,
- 0x10DA7A5A,
- 0x67DD4ACC,
- 0xF9B9DF6F,
- 0x8EBEEFF9,
- 0x17B7BE43,
- 0x60B08ED5,
- 0xD6D6A3E8,
- 0xA1D1937E,
- 0x38D8C2C4,
- 0x4FDFF252,
- 0xD1BB67F1,
- 0xA6BC5767,
- 0x3FB506DD,
- 0x48B2364B,
- 0xD80D2BDA,
- 0xAF0A1B4C,
- 0x36034AF6,
- 0x41047A60,
- 0xDF60EFC3,
- 0xA867DF55,
- 0x316E8EEF,
- 0x4669BE79,
- 0xCB61B38C,
- 0xBC66831A,
- 0x256FD2A0,
- 0x5268E236,
- 0xCC0C7795,
- 0xBB0B4703,
- 0x220216B9,
- 0x5505262F,
- 0xC5BA3BBE,
- 0xB2BD0B28,
- 0x2BB45A92,
- 0x5CB36A04,
- 0xC2D7FFA7,
- 0xB5D0CF31,
- 0x2CD99E8B,
- 0x5BDEAE1D,
- 0x9B64C2B0,
- 0xEC63F226,
- 0x756AA39C,
- 0x026D930A,
- 0x9C0906A9,
- 0xEB0E363F,
- 0x72076785,
- 0x05005713,
- 0x95BF4A82,
- 0xE2B87A14,
- 0x7BB12BAE,
- 0x0CB61B38,
- 0x92D28E9B,
- 0xE5D5BE0D,
- 0x7CDCEFB7,
- 0x0BDBDF21,
- 0x86D3D2D4,
- 0xF1D4E242,
- 0x68DDB3F8,
- 0x1FDA836E,
- 0x81BE16CD,
- 0xF6B9265B,
- 0x6FB077E1,
- 0x18B74777,
- 0x88085AE6,
- 0xFF0F6A70,
- 0x66063BCA,
- 0x11010B5C,
- 0x8F659EFF,
- 0xF862AE69,
- 0x616BFFD3,
- 0x166CCF45,
- 0xA00AE278,
- 0xD70DD2EE,
- 0x4E048354,
- 0x3903B3C2,
- 0xA7672661,
- 0xD06016F7,
- 0x4969474D,
- 0x3E6E77DB,
- 0xAED16A4A,
- 0xD9D65ADC,
- 0x40DF0B66,
- 0x37D83BF0,
- 0xA9BCAE53,
- 0xDEBB9EC5,
- 0x47B2CF7F,
- 0x30B5FFE9,
- 0xBDBDF21C,
- 0xCABAC28A,
- 0x53B39330,
- 0x24B4A3A6,
- 0xBAD03605,
- 0xCDD70693,
- 0x54DE5729,
- 0x23D967BF,
- 0xB3667A2E,
- 0xC4614AB8,
- 0x5D681B02,
- 0x2A6F2B94,
- 0xB40BBE37,
- 0xC30C8EA1,
- 0x5A05DF1B,
- 0x2D02EF8D,
- /* T8_1 */
- 0x00000000,
- 0x191B3141,
- 0x32366282,
- 0x2B2D53C3,
- 0x646CC504,
- 0x7D77F445,
- 0x565AA786,
- 0x4F4196C7,
- 0xC8D98A08,
- 0xD1C2BB49,
- 0xFAEFE88A,
- 0xE3F4D9CB,
- 0xACB54F0C,
- 0xB5AE7E4D,
- 0x9E832D8E,
- 0x87981CCF,
- 0x4AC21251,
- 0x53D92310,
- 0x78F470D3,
- 0x61EF4192,
- 0x2EAED755,
- 0x37B5E614,
- 0x1C98B5D7,
- 0x05838496,
- 0x821B9859,
- 0x9B00A918,
- 0xB02DFADB,
- 0xA936CB9A,
- 0xE6775D5D,
- 0xFF6C6C1C,
- 0xD4413FDF,
- 0xCD5A0E9E,
- 0x958424A2,
- 0x8C9F15E3,
- 0xA7B24620,
- 0xBEA97761,
- 0xF1E8E1A6,
- 0xE8F3D0E7,
- 0xC3DE8324,
- 0xDAC5B265,
- 0x5D5DAEAA,
- 0x44469FEB,
- 0x6F6BCC28,
- 0x7670FD69,
- 0x39316BAE,
- 0x202A5AEF,
- 0x0B07092C,
- 0x121C386D,
- 0xDF4636F3,
- 0xC65D07B2,
- 0xED705471,
- 0xF46B6530,
- 0xBB2AF3F7,
- 0xA231C2B6,
- 0x891C9175,
- 0x9007A034,
- 0x179FBCFB,
- 0x0E848DBA,
- 0x25A9DE79,
- 0x3CB2EF38,
- 0x73F379FF,
- 0x6AE848BE,
- 0x41C51B7D,
- 0x58DE2A3C,
- 0xF0794F05,
- 0xE9627E44,
- 0xC24F2D87,
- 0xDB541CC6,
- 0x94158A01,
- 0x8D0EBB40,
- 0xA623E883,
- 0xBF38D9C2,
- 0x38A0C50D,
- 0x21BBF44C,
- 0x0A96A78F,
- 0x138D96CE,
- 0x5CCC0009,
- 0x45D73148,
- 0x6EFA628B,
- 0x77E153CA,
- 0xBABB5D54,
- 0xA3A06C15,
- 0x888D3FD6,
- 0x91960E97,
- 0xDED79850,
- 0xC7CCA911,
- 0xECE1FAD2,
- 0xF5FACB93,
- 0x7262D75C,
- 0x6B79E61D,
- 0x4054B5DE,
- 0x594F849F,
- 0x160E1258,
- 0x0F152319,
- 0x243870DA,
- 0x3D23419B,
- 0x65FD6BA7,
- 0x7CE65AE6,
- 0x57CB0925,
- 0x4ED03864,
- 0x0191AEA3,
- 0x188A9FE2,
- 0x33A7CC21,
- 0x2ABCFD60,
- 0xAD24E1AF,
- 0xB43FD0EE,
- 0x9F12832D,
- 0x8609B26C,
- 0xC94824AB,
- 0xD05315EA,
- 0xFB7E4629,
- 0xE2657768,
- 0x2F3F79F6,
- 0x362448B7,
- 0x1D091B74,
- 0x04122A35,
- 0x4B53BCF2,
- 0x52488DB3,
- 0x7965DE70,
- 0x607EEF31,
- 0xE7E6F3FE,
- 0xFEFDC2BF,
- 0xD5D0917C,
- 0xCCCBA03D,
- 0x838A36FA,
- 0x9A9107BB,
- 0xB1BC5478,
- 0xA8A76539,
- 0x3B83984B,
- 0x2298A90A,
- 0x09B5FAC9,
- 0x10AECB88,
- 0x5FEF5D4F,
- 0x46F46C0E,
- 0x6DD93FCD,
- 0x74C20E8C,
- 0xF35A1243,
- 0xEA412302,
- 0xC16C70C1,
- 0xD8774180,
- 0x9736D747,
- 0x8E2DE606,
- 0xA500B5C5,
- 0xBC1B8484,
- 0x71418A1A,
- 0x685ABB5B,
- 0x4377E898,
- 0x5A6CD9D9,
- 0x152D4F1E,
- 0x0C367E5F,
- 0x271B2D9C,
- 0x3E001CDD,
- 0xB9980012,
- 0xA0833153,
- 0x8BAE6290,
- 0x92B553D1,
- 0xDDF4C516,
- 0xC4EFF457,
- 0xEFC2A794,
- 0xF6D996D5,
- 0xAE07BCE9,
- 0xB71C8DA8,
- 0x9C31DE6B,
- 0x852AEF2A,
- 0xCA6B79ED,
- 0xD37048AC,
- 0xF85D1B6F,
- 0xE1462A2E,
- 0x66DE36E1,
- 0x7FC507A0,
- 0x54E85463,
- 0x4DF36522,
- 0x02B2F3E5,
- 0x1BA9C2A4,
- 0x30849167,
- 0x299FA026,
- 0xE4C5AEB8,
- 0xFDDE9FF9,
- 0xD6F3CC3A,
- 0xCFE8FD7B,
- 0x80A96BBC,
- 0x99B25AFD,
- 0xB29F093E,
- 0xAB84387F,
- 0x2C1C24B0,
- 0x350715F1,
- 0x1E2A4632,
- 0x07317773,
- 0x4870E1B4,
- 0x516BD0F5,
- 0x7A468336,
- 0x635DB277,
- 0xCBFAD74E,
- 0xD2E1E60F,
- 0xF9CCB5CC,
- 0xE0D7848D,
- 0xAF96124A,
- 0xB68D230B,
- 0x9DA070C8,
- 0x84BB4189,
- 0x03235D46,
- 0x1A386C07,
- 0x31153FC4,
- 0x280E0E85,
- 0x674F9842,
- 0x7E54A903,
- 0x5579FAC0,
- 0x4C62CB81,
- 0x8138C51F,
- 0x9823F45E,
- 0xB30EA79D,
- 0xAA1596DC,
- 0xE554001B,
- 0xFC4F315A,
- 0xD7626299,
- 0xCE7953D8,
- 0x49E14F17,
- 0x50FA7E56,
- 0x7BD72D95,
- 0x62CC1CD4,
- 0x2D8D8A13,
- 0x3496BB52,
- 0x1FBBE891,
- 0x06A0D9D0,
- 0x5E7EF3EC,
- 0x4765C2AD,
- 0x6C48916E,
- 0x7553A02F,
- 0x3A1236E8,
- 0x230907A9,
- 0x0824546A,
- 0x113F652B,
- 0x96A779E4,
- 0x8FBC48A5,
- 0xA4911B66,
- 0xBD8A2A27,
- 0xF2CBBCE0,
- 0xEBD08DA1,
- 0xC0FDDE62,
- 0xD9E6EF23,
- 0x14BCE1BD,
- 0x0DA7D0FC,
- 0x268A833F,
- 0x3F91B27E,
- 0x70D024B9,
- 0x69CB15F8,
- 0x42E6463B,
- 0x5BFD777A,
- 0xDC656BB5,
- 0xC57E5AF4,
- 0xEE530937,
- 0xF7483876,
- 0xB809AEB1,
- 0xA1129FF0,
- 0x8A3FCC33,
- 0x9324FD72,
- /* T8_2 */
- 0x00000000,
- 0x01C26A37,
- 0x0384D46E,
- 0x0246BE59,
- 0x0709A8DC,
- 0x06CBC2EB,
- 0x048D7CB2,
- 0x054F1685,
- 0x0E1351B8,
- 0x0FD13B8F,
- 0x0D9785D6,
- 0x0C55EFE1,
- 0x091AF964,
- 0x08D89353,
- 0x0A9E2D0A,
- 0x0B5C473D,
- 0x1C26A370,
- 0x1DE4C947,
- 0x1FA2771E,
- 0x1E601D29,
- 0x1B2F0BAC,
- 0x1AED619B,
- 0x18ABDFC2,
- 0x1969B5F5,
- 0x1235F2C8,
- 0x13F798FF,
- 0x11B126A6,
- 0x10734C91,
- 0x153C5A14,
- 0x14FE3023,
- 0x16B88E7A,
- 0x177AE44D,
- 0x384D46E0,
- 0x398F2CD7,
- 0x3BC9928E,
- 0x3A0BF8B9,
- 0x3F44EE3C,
- 0x3E86840B,
- 0x3CC03A52,
- 0x3D025065,
- 0x365E1758,
- 0x379C7D6F,
- 0x35DAC336,
- 0x3418A901,
- 0x3157BF84,
- 0x3095D5B3,
- 0x32D36BEA,
- 0x331101DD,
- 0x246BE590,
- 0x25A98FA7,
- 0x27EF31FE,
- 0x262D5BC9,
- 0x23624D4C,
- 0x22A0277B,
- 0x20E69922,
- 0x2124F315,
- 0x2A78B428,
- 0x2BBADE1F,
- 0x29FC6046,
- 0x283E0A71,
- 0x2D711CF4,
- 0x2CB376C3,
- 0x2EF5C89A,
- 0x2F37A2AD,
- 0x709A8DC0,
- 0x7158E7F7,
- 0x731E59AE,
- 0x72DC3399,
- 0x7793251C,
- 0x76514F2B,
- 0x7417F172,
- 0x75D59B45,
- 0x7E89DC78,
- 0x7F4BB64F,
- 0x7D0D0816,
- 0x7CCF6221,
- 0x798074A4,
- 0x78421E93,
- 0x7A04A0CA,
- 0x7BC6CAFD,
- 0x6CBC2EB0,
- 0x6D7E4487,
- 0x6F38FADE,
- 0x6EFA90E9,
- 0x6BB5866C,
- 0x6A77EC5B,
- 0x68315202,
- 0x69F33835,
- 0x62AF7F08,
- 0x636D153F,
- 0x612BAB66,
- 0x60E9C151,
- 0x65A6D7D4,
- 0x6464BDE3,
- 0x662203BA,
- 0x67E0698D,
- 0x48D7CB20,
- 0x4915A117,
- 0x4B531F4E,
- 0x4A917579,
- 0x4FDE63FC,
- 0x4E1C09CB,
- 0x4C5AB792,
- 0x4D98DDA5,
- 0x46C49A98,
- 0x4706F0AF,
- 0x45404EF6,
- 0x448224C1,
- 0x41CD3244,
- 0x400F5873,
- 0x4249E62A,
- 0x438B8C1D,
- 0x54F16850,
- 0x55330267,
- 0x5775BC3E,
- 0x56B7D609,
- 0x53F8C08C,
- 0x523AAABB,
- 0x507C14E2,
- 0x51BE7ED5,
- 0x5AE239E8,
- 0x5B2053DF,
- 0x5966ED86,
- 0x58A487B1,
- 0x5DEB9134,
- 0x5C29FB03,
- 0x5E6F455A,
- 0x5FAD2F6D,
- 0xE1351B80,
- 0xE0F771B7,
- 0xE2B1CFEE,
- 0xE373A5D9,
- 0xE63CB35C,
- 0xE7FED96B,
- 0xE5B86732,
- 0xE47A0D05,
- 0xEF264A38,
- 0xEEE4200F,
- 0xECA29E56,
- 0xED60F461,
- 0xE82FE2E4,
- 0xE9ED88D3,
- 0xEBAB368A,
- 0xEA695CBD,
- 0xFD13B8F0,
- 0xFCD1D2C7,
- 0xFE976C9E,
- 0xFF5506A9,
- 0xFA1A102C,
- 0xFBD87A1B,
- 0xF99EC442,
- 0xF85CAE75,
- 0xF300E948,
- 0xF2C2837F,
- 0xF0843D26,
- 0xF1465711,
- 0xF4094194,
- 0xF5CB2BA3,
- 0xF78D95FA,
- 0xF64FFFCD,
- 0xD9785D60,
- 0xD8BA3757,
- 0xDAFC890E,
- 0xDB3EE339,
- 0xDE71F5BC,
- 0xDFB39F8B,
- 0xDDF521D2,
- 0xDC374BE5,
- 0xD76B0CD8,
- 0xD6A966EF,
- 0xD4EFD8B6,
- 0xD52DB281,
- 0xD062A404,
- 0xD1A0CE33,
- 0xD3E6706A,
- 0xD2241A5D,
- 0xC55EFE10,
- 0xC49C9427,
- 0xC6DA2A7E,
- 0xC7184049,
- 0xC25756CC,
- 0xC3953CFB,
- 0xC1D382A2,
- 0xC011E895,
- 0xCB4DAFA8,
- 0xCA8FC59F,
- 0xC8C97BC6,
- 0xC90B11F1,
- 0xCC440774,
- 0xCD866D43,
- 0xCFC0D31A,
- 0xCE02B92D,
- 0x91AF9640,
- 0x906DFC77,
- 0x922B422E,
- 0x93E92819,
- 0x96A63E9C,
- 0x976454AB,
- 0x9522EAF2,
- 0x94E080C5,
- 0x9FBCC7F8,
- 0x9E7EADCF,
- 0x9C381396,
- 0x9DFA79A1,
- 0x98B56F24,
- 0x99770513,
- 0x9B31BB4A,
- 0x9AF3D17D,
- 0x8D893530,
- 0x8C4B5F07,
- 0x8E0DE15E,
- 0x8FCF8B69,
- 0x8A809DEC,
- 0x8B42F7DB,
- 0x89044982,
- 0x88C623B5,
- 0x839A6488,
- 0x82580EBF,
- 0x801EB0E6,
- 0x81DCDAD1,
- 0x8493CC54,
- 0x8551A663,
- 0x8717183A,
- 0x86D5720D,
- 0xA9E2D0A0,
- 0xA820BA97,
- 0xAA6604CE,
- 0xABA46EF9,
- 0xAEEB787C,
- 0xAF29124B,
- 0xAD6FAC12,
- 0xACADC625,
- 0xA7F18118,
- 0xA633EB2F,
- 0xA4755576,
- 0xA5B73F41,
- 0xA0F829C4,
- 0xA13A43F3,
- 0xA37CFDAA,
- 0xA2BE979D,
- 0xB5C473D0,
- 0xB40619E7,
- 0xB640A7BE,
- 0xB782CD89,
- 0xB2CDDB0C,
- 0xB30FB13B,
- 0xB1490F62,
- 0xB08B6555,
- 0xBBD72268,
- 0xBA15485F,
- 0xB853F606,
- 0xB9919C31,
- 0xBCDE8AB4,
- 0xBD1CE083,
- 0xBF5A5EDA,
- 0xBE9834ED,
- /* T8_3 */
- 0x00000000,
- 0xB8BC6765,
- 0xAA09C88B,
- 0x12B5AFEE,
- 0x8F629757,
- 0x37DEF032,
- 0x256B5FDC,
- 0x9DD738B9,
- 0xC5B428EF,
- 0x7D084F8A,
- 0x6FBDE064,
- 0xD7018701,
- 0x4AD6BFB8,
- 0xF26AD8DD,
- 0xE0DF7733,
- 0x58631056,
- 0x5019579F,
- 0xE8A530FA,
- 0xFA109F14,
- 0x42ACF871,
- 0xDF7BC0C8,
- 0x67C7A7AD,
- 0x75720843,
- 0xCDCE6F26,
- 0x95AD7F70,
- 0x2D111815,
- 0x3FA4B7FB,
- 0x8718D09E,
- 0x1ACFE827,
- 0xA2738F42,
- 0xB0C620AC,
- 0x087A47C9,
- 0xA032AF3E,
- 0x188EC85B,
- 0x0A3B67B5,
- 0xB28700D0,
- 0x2F503869,
- 0x97EC5F0C,
- 0x8559F0E2,
- 0x3DE59787,
- 0x658687D1,
- 0xDD3AE0B4,
- 0xCF8F4F5A,
- 0x7733283F,
- 0xEAE41086,
- 0x525877E3,
- 0x40EDD80D,
- 0xF851BF68,
- 0xF02BF8A1,
- 0x48979FC4,
- 0x5A22302A,
- 0xE29E574F,
- 0x7F496FF6,
- 0xC7F50893,
- 0xD540A77D,
- 0x6DFCC018,
- 0x359FD04E,
- 0x8D23B72B,
- 0x9F9618C5,
- 0x272A7FA0,
- 0xBAFD4719,
- 0x0241207C,
- 0x10F48F92,
- 0xA848E8F7,
- 0x9B14583D,
- 0x23A83F58,
- 0x311D90B6,
- 0x89A1F7D3,
- 0x1476CF6A,
- 0xACCAA80F,
- 0xBE7F07E1,
- 0x06C36084,
- 0x5EA070D2,
- 0xE61C17B7,
- 0xF4A9B859,
- 0x4C15DF3C,
- 0xD1C2E785,
- 0x697E80E0,
- 0x7BCB2F0E,
- 0xC377486B,
- 0xCB0D0FA2,
- 0x73B168C7,
- 0x6104C729,
- 0xD9B8A04C,
- 0x446F98F5,
- 0xFCD3FF90,
- 0xEE66507E,
- 0x56DA371B,
- 0x0EB9274D,
- 0xB6054028,
- 0xA4B0EFC6,
- 0x1C0C88A3,
- 0x81DBB01A,
- 0x3967D77F,
- 0x2BD27891,
- 0x936E1FF4,
- 0x3B26F703,
- 0x839A9066,
- 0x912F3F88,
- 0x299358ED,
- 0xB4446054,
- 0x0CF80731,
- 0x1E4DA8DF,
- 0xA6F1CFBA,
- 0xFE92DFEC,
- 0x462EB889,
- 0x549B1767,
- 0xEC277002,
- 0x71F048BB,
- 0xC94C2FDE,
- 0xDBF98030,
- 0x6345E755,
- 0x6B3FA09C,
- 0xD383C7F9,
- 0xC1366817,
- 0x798A0F72,
- 0xE45D37CB,
- 0x5CE150AE,
- 0x4E54FF40,
- 0xF6E89825,
- 0xAE8B8873,
- 0x1637EF16,
- 0x048240F8,
- 0xBC3E279D,
- 0x21E91F24,
- 0x99557841,
- 0x8BE0D7AF,
- 0x335CB0CA,
- 0xED59B63B,
- 0x55E5D15E,
- 0x47507EB0,
- 0xFFEC19D5,
- 0x623B216C,
- 0xDA874609,
- 0xC832E9E7,
- 0x708E8E82,
- 0x28ED9ED4,
- 0x9051F9B1,
- 0x82E4565F,
- 0x3A58313A,
- 0xA78F0983,
- 0x1F336EE6,
- 0x0D86C108,
- 0xB53AA66D,
- 0xBD40E1A4,
- 0x05FC86C1,
- 0x1749292F,
- 0xAFF54E4A,
- 0x322276F3,
- 0x8A9E1196,
- 0x982BBE78,
- 0x2097D91D,
- 0x78F4C94B,
- 0xC048AE2E,
- 0xD2FD01C0,
- 0x6A4166A5,
- 0xF7965E1C,
- 0x4F2A3979,
- 0x5D9F9697,
- 0xE523F1F2,
- 0x4D6B1905,
- 0xF5D77E60,
- 0xE762D18E,
- 0x5FDEB6EB,
- 0xC2098E52,
- 0x7AB5E937,
- 0x680046D9,
- 0xD0BC21BC,
- 0x88DF31EA,
- 0x3063568F,
- 0x22D6F961,
- 0x9A6A9E04,
- 0x07BDA6BD,
- 0xBF01C1D8,
- 0xADB46E36,
- 0x15080953,
- 0x1D724E9A,
- 0xA5CE29FF,
- 0xB77B8611,
- 0x0FC7E174,
- 0x9210D9CD,
- 0x2AACBEA8,
- 0x38191146,
- 0x80A57623,
- 0xD8C66675,
- 0x607A0110,
- 0x72CFAEFE,
- 0xCA73C99B,
- 0x57A4F122,
- 0xEF189647,
- 0xFDAD39A9,
- 0x45115ECC,
- 0x764DEE06,
- 0xCEF18963,
- 0xDC44268D,
- 0x64F841E8,
- 0xF92F7951,
- 0x41931E34,
- 0x5326B1DA,
- 0xEB9AD6BF,
- 0xB3F9C6E9,
- 0x0B45A18C,
- 0x19F00E62,
- 0xA14C6907,
- 0x3C9B51BE,
- 0x842736DB,
- 0x96929935,
- 0x2E2EFE50,
- 0x2654B999,
- 0x9EE8DEFC,
- 0x8C5D7112,
- 0x34E11677,
- 0xA9362ECE,
- 0x118A49AB,
- 0x033FE645,
- 0xBB838120,
- 0xE3E09176,
- 0x5B5CF613,
- 0x49E959FD,
- 0xF1553E98,
- 0x6C820621,
- 0xD43E6144,
- 0xC68BCEAA,
- 0x7E37A9CF,
- 0xD67F4138,
- 0x6EC3265D,
- 0x7C7689B3,
- 0xC4CAEED6,
- 0x591DD66F,
- 0xE1A1B10A,
- 0xF3141EE4,
- 0x4BA87981,
- 0x13CB69D7,
- 0xAB770EB2,
- 0xB9C2A15C,
- 0x017EC639,
- 0x9CA9FE80,
- 0x241599E5,
- 0x36A0360B,
- 0x8E1C516E,
- 0x866616A7,
- 0x3EDA71C2,
- 0x2C6FDE2C,
- 0x94D3B949,
- 0x090481F0,
- 0xB1B8E695,
- 0xA30D497B,
- 0x1BB12E1E,
- 0x43D23E48,
- 0xFB6E592D,
- 0xE9DBF6C3,
- 0x516791A6,
- 0xCCB0A91F,
- 0x740CCE7A,
- 0x66B96194,
- 0xDE0506F1,
- /* T8_4 */
- 0x00000000,
- 0x3D6029B0,
- 0x7AC05360,
- 0x47A07AD0,
- 0xF580A6C0,
- 0xC8E08F70,
- 0x8F40F5A0,
- 0xB220DC10,
- 0x30704BC1,
- 0x0D106271,
- 0x4AB018A1,
- 0x77D03111,
- 0xC5F0ED01,
- 0xF890C4B1,
- 0xBF30BE61,
- 0x825097D1,
- 0x60E09782,
- 0x5D80BE32,
- 0x1A20C4E2,
- 0x2740ED52,
- 0x95603142,
- 0xA80018F2,
- 0xEFA06222,
- 0xD2C04B92,
- 0x5090DC43,
- 0x6DF0F5F3,
- 0x2A508F23,
- 0x1730A693,
- 0xA5107A83,
- 0x98705333,
- 0xDFD029E3,
- 0xE2B00053,
- 0xC1C12F04,
- 0xFCA106B4,
- 0xBB017C64,
- 0x866155D4,
- 0x344189C4,
- 0x0921A074,
- 0x4E81DAA4,
- 0x73E1F314,
- 0xF1B164C5,
- 0xCCD14D75,
- 0x8B7137A5,
- 0xB6111E15,
- 0x0431C205,
- 0x3951EBB5,
- 0x7EF19165,
- 0x4391B8D5,
- 0xA121B886,
- 0x9C419136,
- 0xDBE1EBE6,
- 0xE681C256,
- 0x54A11E46,
- 0x69C137F6,
- 0x2E614D26,
- 0x13016496,
- 0x9151F347,
- 0xAC31DAF7,
- 0xEB91A027,
- 0xD6F18997,
- 0x64D15587,
- 0x59B17C37,
- 0x1E1106E7,
- 0x23712F57,
- 0x58F35849,
- 0x659371F9,
- 0x22330B29,
- 0x1F532299,
- 0xAD73FE89,
- 0x9013D739,
- 0xD7B3ADE9,
- 0xEAD38459,
- 0x68831388,
- 0x55E33A38,
- 0x124340E8,
- 0x2F236958,
- 0x9D03B548,
- 0xA0639CF8,
- 0xE7C3E628,
- 0xDAA3CF98,
- 0x3813CFCB,
- 0x0573E67B,
- 0x42D39CAB,
- 0x7FB3B51B,
- 0xCD93690B,
- 0xF0F340BB,
- 0xB7533A6B,
- 0x8A3313DB,
- 0x0863840A,
- 0x3503ADBA,
- 0x72A3D76A,
- 0x4FC3FEDA,
- 0xFDE322CA,
- 0xC0830B7A,
- 0x872371AA,
- 0xBA43581A,
- 0x9932774D,
- 0xA4525EFD,
- 0xE3F2242D,
- 0xDE920D9D,
- 0x6CB2D18D,
- 0x51D2F83D,
- 0x167282ED,
- 0x2B12AB5D,
- 0xA9423C8C,
- 0x9422153C,
- 0xD3826FEC,
- 0xEEE2465C,
- 0x5CC29A4C,
- 0x61A2B3FC,
- 0x2602C92C,
- 0x1B62E09C,
- 0xF9D2E0CF,
- 0xC4B2C97F,
- 0x8312B3AF,
- 0xBE729A1F,
- 0x0C52460F,
- 0x31326FBF,
- 0x7692156F,
- 0x4BF23CDF,
- 0xC9A2AB0E,
- 0xF4C282BE,
- 0xB362F86E,
- 0x8E02D1DE,
- 0x3C220DCE,
- 0x0142247E,
- 0x46E25EAE,
- 0x7B82771E,
- 0xB1E6B092,
- 0x8C869922,
- 0xCB26E3F2,
- 0xF646CA42,
- 0x44661652,
- 0x79063FE2,
- 0x3EA64532,
- 0x03C66C82,
- 0x8196FB53,
- 0xBCF6D2E3,
- 0xFB56A833,
- 0xC6368183,
- 0x74165D93,
- 0x49767423,
- 0x0ED60EF3,
- 0x33B62743,
- 0xD1062710,
- 0xEC660EA0,
- 0xABC67470,
- 0x96A65DC0,
- 0x248681D0,
- 0x19E6A860,
- 0x5E46D2B0,
- 0x6326FB00,
- 0xE1766CD1,
- 0xDC164561,
- 0x9BB63FB1,
- 0xA6D61601,
- 0x14F6CA11,
- 0x2996E3A1,
- 0x6E369971,
- 0x5356B0C1,
- 0x70279F96,
- 0x4D47B626,
- 0x0AE7CCF6,
- 0x3787E546,
- 0x85A73956,
- 0xB8C710E6,
- 0xFF676A36,
- 0xC2074386,
- 0x4057D457,
- 0x7D37FDE7,
- 0x3A978737,
- 0x07F7AE87,
- 0xB5D77297,
- 0x88B75B27,
- 0xCF1721F7,
- 0xF2770847,
- 0x10C70814,
- 0x2DA721A4,
- 0x6A075B74,
- 0x576772C4,
- 0xE547AED4,
- 0xD8278764,
- 0x9F87FDB4,
- 0xA2E7D404,
- 0x20B743D5,
- 0x1DD76A65,
- 0x5A7710B5,
- 0x67173905,
- 0xD537E515,
- 0xE857CCA5,
- 0xAFF7B675,
- 0x92979FC5,
- 0xE915E8DB,
- 0xD475C16B,
- 0x93D5BBBB,
- 0xAEB5920B,
- 0x1C954E1B,
- 0x21F567AB,
- 0x66551D7B,
- 0x5B3534CB,
- 0xD965A31A,
- 0xE4058AAA,
- 0xA3A5F07A,
- 0x9EC5D9CA,
- 0x2CE505DA,
- 0x11852C6A,
- 0x562556BA,
- 0x6B457F0A,
- 0x89F57F59,
- 0xB49556E9,
- 0xF3352C39,
- 0xCE550589,
- 0x7C75D999,
- 0x4115F029,
- 0x06B58AF9,
- 0x3BD5A349,
- 0xB9853498,
- 0x84E51D28,
- 0xC34567F8,
- 0xFE254E48,
- 0x4C059258,
- 0x7165BBE8,
- 0x36C5C138,
- 0x0BA5E888,
- 0x28D4C7DF,
- 0x15B4EE6F,
- 0x521494BF,
- 0x6F74BD0F,
- 0xDD54611F,
- 0xE03448AF,
- 0xA794327F,
- 0x9AF41BCF,
- 0x18A48C1E,
- 0x25C4A5AE,
- 0x6264DF7E,
- 0x5F04F6CE,
- 0xED242ADE,
- 0xD044036E,
- 0x97E479BE,
- 0xAA84500E,
- 0x4834505D,
- 0x755479ED,
- 0x32F4033D,
- 0x0F942A8D,
- 0xBDB4F69D,
- 0x80D4DF2D,
- 0xC774A5FD,
- 0xFA148C4D,
- 0x78441B9C,
- 0x4524322C,
- 0x028448FC,
- 0x3FE4614C,
- 0x8DC4BD5C,
- 0xB0A494EC,
- 0xF704EE3C,
- 0xCA64C78C,
- /* T8_5 */
- 0x00000000,
- 0xCB5CD3A5,
- 0x4DC8A10B,
- 0x869472AE,
- 0x9B914216,
- 0x50CD91B3,
- 0xD659E31D,
- 0x1D0530B8,
- 0xEC53826D,
- 0x270F51C8,
- 0xA19B2366,
- 0x6AC7F0C3,
- 0x77C2C07B,
- 0xBC9E13DE,
- 0x3A0A6170,
- 0xF156B2D5,
- 0x03D6029B,
- 0xC88AD13E,
- 0x4E1EA390,
- 0x85427035,
- 0x9847408D,
- 0x531B9328,
- 0xD58FE186,
- 0x1ED33223,
- 0xEF8580F6,
- 0x24D95353,
- 0xA24D21FD,
- 0x6911F258,
- 0x7414C2E0,
- 0xBF481145,
- 0x39DC63EB,
- 0xF280B04E,
- 0x07AC0536,
- 0xCCF0D693,
- 0x4A64A43D,
- 0x81387798,
- 0x9C3D4720,
- 0x57619485,
- 0xD1F5E62B,
- 0x1AA9358E,
- 0xEBFF875B,
- 0x20A354FE,
- 0xA6372650,
- 0x6D6BF5F5,
- 0x706EC54D,
- 0xBB3216E8,
- 0x3DA66446,
- 0xF6FAB7E3,
- 0x047A07AD,
- 0xCF26D408,
- 0x49B2A6A6,
- 0x82EE7503,
- 0x9FEB45BB,
- 0x54B7961E,
- 0xD223E4B0,
- 0x197F3715,
- 0xE82985C0,
- 0x23755665,
- 0xA5E124CB,
- 0x6EBDF76E,
- 0x73B8C7D6,
- 0xB8E41473,
- 0x3E7066DD,
- 0xF52CB578,
- 0x0F580A6C,
- 0xC404D9C9,
- 0x4290AB67,
- 0x89CC78C2,
- 0x94C9487A,
- 0x5F959BDF,
- 0xD901E971,
- 0x125D3AD4,
- 0xE30B8801,
- 0x28575BA4,
- 0xAEC3290A,
- 0x659FFAAF,
- 0x789ACA17,
- 0xB3C619B2,
- 0x35526B1C,
- 0xFE0EB8B9,
- 0x0C8E08F7,
- 0xC7D2DB52,
- 0x4146A9FC,
- 0x8A1A7A59,
- 0x971F4AE1,
- 0x5C439944,
- 0xDAD7EBEA,
- 0x118B384F,
- 0xE0DD8A9A,
- 0x2B81593F,
- 0xAD152B91,
- 0x6649F834,
- 0x7B4CC88C,
- 0xB0101B29,
- 0x36846987,
- 0xFDD8BA22,
- 0x08F40F5A,
- 0xC3A8DCFF,
- 0x453CAE51,
- 0x8E607DF4,
- 0x93654D4C,
- 0x58399EE9,
- 0xDEADEC47,
- 0x15F13FE2,
- 0xE4A78D37,
- 0x2FFB5E92,
- 0xA96F2C3C,
- 0x6233FF99,
- 0x7F36CF21,
- 0xB46A1C84,
- 0x32FE6E2A,
- 0xF9A2BD8F,
- 0x0B220DC1,
- 0xC07EDE64,
- 0x46EAACCA,
- 0x8DB67F6F,
- 0x90B34FD7,
- 0x5BEF9C72,
- 0xDD7BEEDC,
- 0x16273D79,
- 0xE7718FAC,
- 0x2C2D5C09,
- 0xAAB92EA7,
- 0x61E5FD02,
- 0x7CE0CDBA,
- 0xB7BC1E1F,
- 0x31286CB1,
- 0xFA74BF14,
- 0x1EB014D8,
- 0xD5ECC77D,
- 0x5378B5D3,
- 0x98246676,
- 0x852156CE,
- 0x4E7D856B,
- 0xC8E9F7C5,
- 0x03B52460,
- 0xF2E396B5,
- 0x39BF4510,
- 0xBF2B37BE,
- 0x7477E41B,
- 0x6972D4A3,
- 0xA22E0706,
- 0x24BA75A8,
- 0xEFE6A60D,
-
<TRUNCATED>
[3/6] kafka git commit: KAFKA-1915: Add checkstyle for java code.
Posted by jk...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
index 4c99d4a..1651e75 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
@@ -43,6 +43,6 @@ public class ConsumerMetadataRequest extends AbstractRequestResponse {
}
public static ConsumerMetadataRequest parse(ByteBuffer buffer) {
- return new ConsumerMetadataRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
+ return new ConsumerMetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
index 173333b..0c250c3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
@@ -65,6 +65,6 @@ public class ConsumerMetadataResponse extends AbstractRequestResponse {
}
public static ConsumerMetadataResponse parse(ByteBuffer buffer) {
- return new ConsumerMetadataResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
+ return new ConsumerMetadataResponse((Struct) CURRENT_SCHEMA.read(buffer));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 2529a09..721e7d3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -135,6 +135,6 @@ public class FetchRequest extends AbstractRequestResponse {
}
public static FetchRequest parse(ByteBuffer buffer) {
- return new FetchRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
+ return new FetchRequest((Struct) CURRENT_SCHEMA.read(buffer));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index c1e5f44..e67c4c8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -106,6 +106,6 @@ public class FetchResponse extends AbstractRequestResponse {
}
public static FetchResponse parse(ByteBuffer buffer) {
- return new FetchResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
+ return new FetchResponse((Struct) CURRENT_SCHEMA.read(buffer));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
index cfdb5de..6943878 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -60,6 +60,6 @@ public class HeartbeatRequest extends AbstractRequestResponse {
}
public static HeartbeatRequest parse(ByteBuffer buffer) {
- return new HeartbeatRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
+ return new HeartbeatRequest((Struct) CURRENT_SCHEMA.read(buffer));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index ea964f7..0057496 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -41,6 +41,6 @@ public class HeartbeatResponse extends AbstractRequestResponse {
}
public static HeartbeatResponse parse(ByteBuffer buffer) {
- return new HeartbeatResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
+ return new HeartbeatResponse((Struct) CURRENT_SCHEMA.read(buffer));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index a1d48c9..8c50e9b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -83,6 +83,6 @@ public class JoinGroupRequest extends AbstractRequestResponse {
}
public static JoinGroupRequest parse(ByteBuffer buffer) {
- return new JoinGroupRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
+ return new JoinGroupRequest((Struct) CURRENT_SCHEMA.read(buffer));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index 1e9f349..52b1803 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -98,6 +98,6 @@ public class JoinGroupResponse extends AbstractRequestResponse {
}
public static JoinGroupResponse parse(ByteBuffer buffer) {
- return new JoinGroupResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
+ return new JoinGroupResponse((Struct) CURRENT_SCHEMA.read(buffer));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 05c5fed..e5dc92e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -58,7 +58,7 @@ public class ListOffsetRequest extends AbstractRequestResponse {
}
public ListOffsetRequest(Map<TopicPartition, PartitionData> offsetData) {
- this(-1, offsetData);
+ this(-1, offsetData);
}
public ListOffsetRequest(int replicaId, Map<TopicPartition, PartitionData> offsetData) {
@@ -114,6 +114,6 @@ public class ListOffsetRequest extends AbstractRequestResponse {
}
public static ListOffsetRequest parse(ByteBuffer buffer) {
- return new ListOffsetRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
+ return new ListOffsetRequest((Struct) CURRENT_SCHEMA.read(buffer));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index b2e473e..cfac47a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -104,6 +104,6 @@ public class ListOffsetResponse extends AbstractRequestResponse {
}
public static ListOffsetResponse parse(ByteBuffer buffer) {
- return new ListOffsetResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
+ return new ListOffsetResponse((Struct) CURRENT_SCHEMA.read(buffer));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 0186783..5d5f52c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -48,6 +48,6 @@ public class MetadataRequest extends AbstractRequestResponse {
}
public static MetadataRequest parse(ByteBuffer buffer) {
- return new MetadataRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
+ return new MetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 13daf59..90f3141 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -29,7 +29,7 @@ import org.apache.kafka.common.protocol.types.Struct;
public class MetadataResponse extends AbstractRequestResponse {
- private static Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id);
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id);
private static final String BROKERS_KEY_NAME = "brokers";
private static final String TOPIC_METATDATA_KEY_NAME = "topic_metadata";
@@ -69,12 +69,12 @@ public class MetadataResponse extends AbstractRequestResponse {
List<Struct> topicArray = new ArrayList<Struct>();
for (String topic: cluster.topics()) {
Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME);
- topicData.set(TOPIC_ERROR_CODE_KEY_NAME, (short)0); // no error
+ topicData.set(TOPIC_ERROR_CODE_KEY_NAME, Errors.NONE.code());
topicData.set(TOPIC_KEY_NAME, topic);
List<Struct> partitionArray = new ArrayList<Struct>();
for (PartitionInfo fetchPartitionData : cluster.partitionsForTopic(topic)) {
Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME);
- partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, (short)0); // no error
+ partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, Errors.NONE.code());
partitionData.set(PARTITION_KEY_NAME, fetchPartitionData.partition());
partitionData.set(LEADER_KEY_NAME, fetchPartitionData.leader().id());
ArrayList<Integer> replicas = new ArrayList<Integer>();
@@ -148,6 +148,6 @@ public class MetadataResponse extends AbstractRequestResponse {
}
public static MetadataResponse parse(ByteBuffer buffer) {
- return new MetadataResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
+ return new MetadataResponse((Struct) CURRENT_SCHEMA.read(buffer));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 4fb48c8..94e9d37 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -172,10 +172,10 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
public static OffsetCommitRequest parse(ByteBuffer buffer, int versionId) {
Schema schema = ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, versionId);
- return new OffsetCommitRequest(((Struct) schema.read(buffer)));
+ return new OffsetCommitRequest((Struct) schema.read(buffer));
}
public static OffsetCommitRequest parse(ByteBuffer buffer) {
- return new OffsetCommitRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
+ return new OffsetCommitRequest((Struct) CURRENT_SCHEMA.read(buffer));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 2ab1dc6..4d3b9ec 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -83,6 +83,6 @@ public class OffsetCommitResponse extends AbstractRequestResponse {
}
public static OffsetCommitResponse parse(ByteBuffer buffer) {
- return new OffsetCommitResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
+ return new OffsetCommitResponse((Struct) CURRENT_SCHEMA.read(buffer));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 333483f..16c807c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -83,7 +83,7 @@ public class OffsetFetchRequest extends AbstractRequestResponse {
}
}
groupId = struct.getString(GROUP_ID_KEY_NAME);
- }
+ }
public String groupId() {
return groupId;
@@ -94,6 +94,6 @@ public class OffsetFetchRequest extends AbstractRequestResponse {
}
public static OffsetFetchRequest parse(ByteBuffer buffer) {
- return new OffsetFetchRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
+ return new OffsetFetchRequest((Struct) CURRENT_SCHEMA.read(buffer));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 04c88c0..edbed58 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -108,6 +108,6 @@ public class OffsetFetchResponse extends AbstractRequestResponse {
}
public static OffsetFetchResponse parse(ByteBuffer buffer) {
- return new OffsetFetchResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
+ return new OffsetFetchResponse((Struct) CURRENT_SCHEMA.read(buffer));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 03a0ab1..995f89f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -101,6 +101,6 @@ public class ProduceRequest extends AbstractRequestResponse {
}
public static ProduceRequest parse(ByteBuffer buffer) {
- return new ProduceRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
+ return new ProduceRequest((Struct) CURRENT_SCHEMA.read(buffer));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index e42d7db..a00dcdf 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -108,6 +108,6 @@ public class ProduceResponse extends AbstractRequestResponse {
}
public static ProduceResponse parse(ByteBuffer buffer) {
- return new ProduceResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
+ return new ProduceResponse((Struct) CURRENT_SCHEMA.read(buffer));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
index f459a2a..14bcde7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
@@ -26,10 +26,10 @@ import org.apache.kafka.common.protocol.types.Struct;
*/
public class RequestHeader extends AbstractRequestResponse {
- private static Field API_KEY_FIELD = REQUEST_HEADER.get("api_key");
- private static Field API_VERSION_FIELD = REQUEST_HEADER.get("api_version");
- private static Field CLIENT_ID_FIELD = REQUEST_HEADER.get("client_id");
- private static Field CORRELATION_ID_FIELD = REQUEST_HEADER.get("correlation_id");
+ private static final Field API_KEY_FIELD = REQUEST_HEADER.get("api_key");
+ private static final Field API_VERSION_FIELD = REQUEST_HEADER.get("api_version");
+ private static final Field CLIENT_ID_FIELD = REQUEST_HEADER.get("client_id");
+ private static final Field CORRELATION_ID_FIELD = REQUEST_HEADER.get("correlation_id");
private final short apiKey;
private final short apiVersion;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
index dd63853..e8a7ef9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
@@ -30,7 +30,7 @@ import org.apache.kafka.common.protocol.types.Struct;
*/
public class ResponseHeader extends AbstractRequestResponse {
- private static Field CORRELATION_KEY_FIELD = RESPONSE_HEADER.get("correlation_id");
+ private static final Field CORRELATION_KEY_FIELD = RESPONSE_HEADER.get("correlation_id");
private final int correlationId;
@@ -50,7 +50,7 @@ public class ResponseHeader extends AbstractRequestResponse {
}
public static ResponseHeader parse(ByteBuffer buffer) {
- return new ResponseHeader(((Struct) Protocol.RESPONSE_HEADER.read(buffer)));
+ return new ResponseHeader((Struct) Protocol.RESPONSE_HEADER.read(buffer));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
deleted file mode 100644
index b987e7f..0000000
--- a/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.utils;
-
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.config.ConfigException;
-
-import static org.apache.kafka.common.utils.Utils.getHost;
-import static org.apache.kafka.common.utils.Utils.getPort;
-
-public class ClientUtils {
-
- public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
- List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
- for (String url : urls) {
- if (url != null && url.length() > 0) {
- String host = getHost(url);
- Integer port = getPort(url);
- if (host == null || port == null)
- throw new ConfigException("Invalid url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
- try {
- InetSocketAddress address = new InetSocketAddress(host, port);
- if (address.isUnresolved())
- throw new ConfigException("DNS resolution failed for url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
- addresses.add(address);
- } catch (NumberFormatException e) {
- throw new ConfigException("Invalid port in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
- }
- }
- }
- if (addresses.size() < 1)
- throw new ConfigException("No bootstrap urls given in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
- return addresses;
- }
-}
\ No newline at end of file
[6/6] kafka git commit: KAFKA-1915: Add checkstyle for java code.
Posted by jk...@apache.org.
KAFKA-1915: Add checkstyle for java code.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1c6d5bba
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1c6d5bba
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1c6d5bba
Branch: refs/heads/trunk
Commit: 1c6d5bbac672cbf49591aed0889510b10e3285fa
Parents: f1ba4ff
Author: Jay Kreps <ja...@gmail.com>
Authored: Mon Feb 2 21:36:21 2015 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Tue Feb 3 09:16:55 2015 -0800
----------------------------------------------------------------------
README.md | 3 +
build.gradle | 6 +
checkstyle/checkstyle.xml | 83 +
checkstyle/import-control.xml | 100 +
.../org/apache/kafka/clients/ClientUtils.java | 48 +
.../kafka/clients/ClusterConnectionStates.java | 9 +-
.../java/org/apache/kafka/clients/Metadata.java | 170 ++
.../org/apache/kafka/clients/NetworkClient.java | 3 +-
.../kafka/clients/consumer/CommitType.java | 12 +
.../kafka/clients/consumer/ConsumerConfig.java | 8 +-
.../kafka/clients/consumer/ConsumerRecords.java | 40 +-
.../kafka/clients/consumer/KafkaConsumer.java | 43 +-
.../clients/consumer/internals/Heartbeat.java | 12 +
.../NoOpConsumerRebalanceCallback.java | 4 +-
.../consumer/internals/SubscriptionState.java | 16 +-
.../kafka/clients/producer/KafkaProducer.java | 25 +-
.../apache/kafka/clients/producer/Producer.java | 6 +-
.../kafka/clients/producer/ProducerConfig.java | 10 +-
.../clients/producer/internals/BufferPool.java | 8 +-
.../clients/producer/internals/Metadata.java | 170 --
.../internals/ProduceRequestResult.java | 1 -
.../producer/internals/RecordAccumulator.java | 40 +-
.../clients/producer/internals/Sender.java | 1 +
.../clients/tools/ProducerPerformance.java | 8 +-
.../java/org/apache/kafka/common/Cluster.java | 4 +-
.../org/apache/kafka/common/MetricName.java | 4 +-
.../org/apache/kafka/common/PartitionInfo.java | 2 +-
.../apache/kafka/common/config/ConfigDef.java | 82 +-
.../NotEnoughReplicasAfterAppendException.java | 33 +-
.../errors/NotEnoughReplicasException.java | 28 +-
.../message/KafkaLZ4BlockInputStream.java | 233 --
.../message/KafkaLZ4BlockOutputStream.java | 387 ---
.../kafka/common/metrics/JmxReporter.java | 57 +-
.../org/apache/kafka/common/metrics/Sensor.java | 12 +-
.../apache/kafka/common/metrics/stats/Rate.java | 2 +-
.../kafka/common/network/NetworkReceive.java | 2 +-
.../apache/kafka/common/network/Selector.java | 9 +-
.../apache/kafka/common/protocol/ApiKeys.java | 12 +-
.../apache/kafka/common/protocol/Protocol.java | 590 +++--
.../kafka/common/protocol/types/Struct.java | 2 +-
.../common/record/ByteBufferOutputStream.java | 2 +-
.../apache/kafka/common/record/Compressor.java | 29 +-
.../common/record/KafkaLZ4BlockInputStream.java | 234 ++
.../record/KafkaLZ4BlockOutputStream.java | 392 +++
.../kafka/common/record/MemoryRecords.java | 36 +-
.../requests/ConsumerMetadataRequest.java | 2 +-
.../requests/ConsumerMetadataResponse.java | 2 +-
.../kafka/common/requests/FetchRequest.java | 2 +-
.../kafka/common/requests/FetchResponse.java | 2 +-
.../kafka/common/requests/HeartbeatRequest.java | 2 +-
.../common/requests/HeartbeatResponse.java | 2 +-
.../kafka/common/requests/JoinGroupRequest.java | 2 +-
.../common/requests/JoinGroupResponse.java | 2 +-
.../common/requests/ListOffsetRequest.java | 4 +-
.../common/requests/ListOffsetResponse.java | 2 +-
.../kafka/common/requests/MetadataRequest.java | 2 +-
.../kafka/common/requests/MetadataResponse.java | 8 +-
.../common/requests/OffsetCommitRequest.java | 4 +-
.../common/requests/OffsetCommitResponse.java | 2 +-
.../common/requests/OffsetFetchRequest.java | 4 +-
.../common/requests/OffsetFetchResponse.java | 2 +-
.../kafka/common/requests/ProduceRequest.java | 2 +-
.../kafka/common/requests/ProduceResponse.java | 2 +-
.../kafka/common/requests/RequestHeader.java | 8 +-
.../kafka/common/requests/ResponseHeader.java | 4 +-
.../apache/kafka/common/utils/ClientUtils.java | 49 -
.../org/apache/kafka/common/utils/Crc32.java | 2338 ++----------------
.../org/apache/kafka/common/utils/Utils.java | 44 +-
.../apache/kafka/clients/ClientUtilsTest.java | 42 +
.../org/apache/kafka/clients/MockClient.java | 18 +-
.../apache/kafka/clients/NetworkClientTest.java | 17 +-
.../clients/consumer/MockConsumerTest.java | 16 +
.../internals/SubscriptionStateTest.java | 16 +
.../kafka/clients/producer/BufferPoolTest.java | 14 +-
.../kafka/clients/producer/MetadataTest.java | 8 +-
.../clients/producer/MockProducerTest.java | 1 +
.../kafka/clients/producer/PartitionerTest.java | 2 +-
.../clients/producer/RecordAccumulatorTest.java | 3 +-
.../kafka/clients/producer/RecordSendTest.java | 5 +-
.../kafka/clients/producer/SenderTest.java | 6 +-
.../kafka/common/config/AbstractConfigTest.java | 106 +-
.../kafka/common/config/ConfigDefTest.java | 5 +-
.../common/metrics/FakeMetricsReporter.java | 32 +
.../kafka/common/metrics/MetricsTest.java | 4 +-
.../common/metrics/stats/HistogramTest.java | 1 -
.../kafka/common/network/SelectorTest.java | 1 -
.../types/ProtocolSerializationTest.java | 14 +-
.../kafka/common/record/MemoryRecordsTest.java | 2 +-
.../apache/kafka/common/record/RecordTest.java | 4 +-
.../common/requests/RequestResponseTest.java | 23 +-
.../common/serialization/SerializationTest.java | 4 +-
.../kafka/common/utils/ClientUtilsTest.java | 42 -
.../org/apache/kafka/common/utils/CrcTest.java | 8 +-
.../org/apache/kafka/test/Microbenchmarks.java | 1 -
.../java/org/apache/kafka/test/TestUtils.java | 16 +-
.../consumer/ConsumerRebalanceListener.java | 3 -
.../kafka/message/CompressionFactory.scala | 2 +-
.../scala/kafka/tools/KafkaMigrationTool.java | 12 +-
core/src/main/scala/kafka/utils/Crc32.java | 40 +-
.../java/kafka/examples/SimpleConsumerDemo.java | 9 +-
100 files changed, 2225 insertions(+), 3721 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 9bdcf70..784daaf 100644
--- a/README.md
+++ b/README.md
@@ -98,6 +98,9 @@ Please note for this to work you should create/update `~/.gradle/gradle.properti
### Determining how transitive dependencies are added ###
./gradlew core:dependencies --configuration runtime
+
+### Running checkstyle on the java code ###
+ ./gradlew checkstyleMain checkstyleTest
### Running in Vagrant ###
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 6844372..0f0fe60 100644
--- a/build.gradle
+++ b/build.gradle
@@ -345,6 +345,7 @@ project(':examples') {
}
project(':clients') {
+ apply plugin: 'checkstyle'
archivesBaseName = "kafka-clients"
dependencies {
@@ -379,4 +380,9 @@ project(':clients') {
artifacts {
archives testJar
}
+
+ checkstyle {
+ configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+ }
+ test.dependsOn('checkstyleMain', 'checkstyleTest')
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
new file mode 100644
index 0000000..a215ff3
--- /dev/null
+++ b/checkstyle/checkstyle.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE module PUBLIC
+ "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+ "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
+<!--
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+-->
+<module name="Checker">
+ <property name="localeLanguage" value="en"/>
+
+ <module name="FileTabCharacter"/>
+
+ <!-- header -->
+ <module name="RegexpHeader">
+ <property name="header" value="/\*\*\nLicensed to the Apache.*"/>
+ </module>
+
+ <module name="TreeWalker">
+
+ <!-- code cleanup -->
+ <module name="UnusedImports"/>
+ <module name="RedundantImport"/>
+ <module name="IllegalImport" />
+ <module name="EqualsHashCode"/>
+ <module name="SimplifyBooleanExpression"/>
+ <module name="OneStatementPerLine"/>
+ <module name="UnnecessaryParentheses" />
+ <module name="SimplifyBooleanReturn"/>
+
+ <!-- style -->
+ <module name="DefaultComesLast"/>
+ <module name="EmptyStatement"/>
+ <module name="ArrayTypeStyle"/>
+ <module name="UpperEll"/>
+ <module name="LeftCurly"/>
+ <module name="RightCurly"/>
+ <module name="EmptyStatement"/>
+ <module name="ConstantName">
+ <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/>
+ </module>
+ <module name="LocalVariableName"/>
+ <module name="LocalFinalVariableName"/>
+ <module name="ClassTypeParameterName"/>
+ <module name="MemberName"/>
+ <module name="MethodTypeParameterName"/>
+ <module name="PackageName"/>
+ <module name="ParameterName"/>
+ <module name="StaticVariableName"/>
+ <module name="TypeName"/>
+
+ <!-- dependencies -->
+ <module name="ImportControl">
+ <property name="file" value="${basedir}/checkstyle/import-control.xml"/>
+ </module>
+
+ <!-- whitespace -->
+ <module name="GenericWhitespace"/>
+ <module name="NoWhitespaceBefore"/>
+ <module name="WhitespaceAfter" />
+ <module name="NoWhitespaceAfter"/>
+ <module name="WhitespaceAround">
+ <property name="allowEmptyConstructors" value="true"/>
+ <property name="allowEmptyMethods" value="true"/>
+ </module>
+ <module name="Indentation"/>
+ <module name="MethodParamPad"/>
+ <module name="ParenPad"/>
+ <module name="TypecastParenPad"/>
+ </module>
+</module>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
new file mode 100644
index 0000000..cca4b38
--- /dev/null
+++ b/checkstyle/import-control.xml
@@ -0,0 +1,100 @@
+<!DOCTYPE import-control PUBLIC
+ "-//Puppy Crawl//DTD Import Control 1.1//EN"
+ "http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
+<!--
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+-->
+<import-control pkg="org.apache.kafka">
+
+ <!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
+
+ <!-- common library dependencies -->
+ <allow pkg="java" />
+ <allow pkg="javax.management" />
+ <allow pkg="org.slf4j" />
+ <allow pkg="org.junit" />
+
+ <!-- no one depends on the server -->
+ <disallow pkg="kafka" />
+
+ <!-- anyone can use public classes -->
+ <allow pkg="org.apache.kafka.common" exact-match="true" />
+ <allow pkg="org.apache.kafka.common.utils" />
+
+ <subpackage name="common">
+ <disallow pkg="org.apache.kafka.clients" />
+ <allow pkg="org.apache.kafka.common" exact-match="true" />
+ <allow pkg="org.apache.kafka.test" />
+
+ <subpackage name="config">
+ <allow pkg="org.apache.kafka.common.config" />
+ <!-- for testing -->
+ <allow pkg="org.apache.kafka.common.metrics" />
+ </subpackage>
+
+ <subpackage name="metrics">
+ <allow pkg="org.apache.kafka.common.metrics" />
+ </subpackage>
+
+ <subpackage name="network">
+ <allow pkg="org.apache.kafka.common.metrics" />
+ </subpackage>
+
+ <subpackage name="protocol">
+ <allow pkg="org.apache.kafka.common.errors" />
+ <allow pkg="org.apache.kafka.common.protocol.types" />
+ </subpackage>
+
+ <subpackage name="record">
+ <allow pkg="net.jpountz" />
+ <allow pkg="org.apache.kafka.common.record" />
+ </subpackage>
+
+ <subpackage name="requests">
+ <allow pkg="org.apache.kafka.common.protocol" />
+ <allow pkg="org.apache.kafka.common.network" />
+ </subpackage>
+
+ <subpackage name="serialization">
+ <allow class="org.apache.kafka.common.errors.SerializationException" />
+ </subpackage>
+ </subpackage>
+
+ <subpackage name="clients">
+ <allow pkg="org.apache.kafka.common" />
+ <allow pkg="org.slf4j" />
+ <allow pkg="org.apache.kafka.clients" exact-match="true"/>
+ <allow pkg="org.apache.kafka.test" />
+
+ <subpackage name="consumer">
+ <allow pkg="org.apache.kafka.clients.consumer" />
+ </subpackage>
+
+ <subpackage name="producer">
+ <allow pkg="org.apache.kafka.clients.producer" />
+ </subpackage>
+
+ <subpackage name="tools">
+ <allow pkg="org.apache.kafka.clients.producer" />
+ <allow pkg="org.apache.kafka.clients.consumer" />
+ </subpackage>
+ </subpackage>
+
+ <subpackage name="test">
+ <allow pkg="org.apache.kafka" />
+ </subpackage>
+
+</import-control>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
new file mode 100644
index 0000000..d0da5d7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kafka.common.config.ConfigException;
+
+import static org.apache.kafka.common.utils.Utils.getHost;
+import static org.apache.kafka.common.utils.Utils.getPort;
+
+public class ClientUtils {
+
+ public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
+ List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
+ for (String url : urls) {
+ if (url != null && url.length() > 0) {
+ String host = getHost(url);
+ Integer port = getPort(url);
+ if (host == null || port == null)
+ throw new ConfigException("Invalid url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
+ try {
+ InetSocketAddress address = new InetSocketAddress(host, port);
+ if (address.isUnresolved())
+ throw new ConfigException("DNS resolution failed for url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
+ addresses.add(address);
+ } catch (NumberFormatException e) {
+ throw new ConfigException("Invalid port in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
+ }
+ }
+ }
+ if (addresses.size() < 1)
+ throw new ConfigException("No bootstrap urls given in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+ return addresses;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 574287d..da76cc2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -69,8 +69,7 @@ final class ClusterConnectionStates {
long timeWaited = now - state.lastConnectAttemptMs;
if (state.state == ConnectionState.DISCONNECTED) {
return Math.max(this.reconnectBackoffMs - timeWaited, 0);
- }
- else {
+ } else {
// When connecting or connected, we should be able to delay indefinitely since other events (connection or
// data acked) will cause a wakeup once data can be sent.
return Long.MAX_VALUE;
@@ -109,7 +108,8 @@ final class ClusterConnectionStates {
* @param node The node we have connected to
*/
public void connected(int node) {
- nodeState(node).state = ConnectionState.CONNECTED;
+ NodeConnectionState nodeState = nodeState(node);
+ nodeState.state = ConnectionState.CONNECTED;
}
/**
@@ -117,7 +117,8 @@ final class ClusterConnectionStates {
* @param node The node we have disconnected from
*/
public void disconnected(int node) {
- nodeState(node).state = ConnectionState.DISCONNECTED;
+ NodeConnectionState nodeState = nodeState(node);
+ nodeState.state = ConnectionState.DISCONNECTED;
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
new file mode 100644
index 0000000..b8cdd14
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A class encapsulating some of the logic around metadata.
+ * <p>
+ * This class is shared by the client thread (for partitioning) and the background sender thread.
+ *
+ * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a
+ * topic we don't have any metadata for it will trigger a metadata update.
+ */
+public final class Metadata {
+
+ private static final Logger log = LoggerFactory.getLogger(Metadata.class);
+
+ private final long refreshBackoffMs;
+ private final long metadataExpireMs;
+ private int version;
+ private long lastRefreshMs;
+ private Cluster cluster;
+ private boolean needUpdate;
+ private final Set<String> topics;
+
+ /**
+ * Create a metadata instance with reasonable defaults
+ */
+ public Metadata() {
+ this(100L, 60 * 60 * 1000L);
+ }
+
+ /**
+ * Create a new Metadata instance
+ * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
+ * polling
+ * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
+ */
+ public Metadata(long refreshBackoffMs, long metadataExpireMs) {
+ this.refreshBackoffMs = refreshBackoffMs;
+ this.metadataExpireMs = metadataExpireMs;
+ this.lastRefreshMs = 0L;
+ this.version = 0;
+ this.cluster = Cluster.empty();
+ this.needUpdate = false;
+ this.topics = new HashSet<String>();
+ }
+
+ /**
+ * Get the current cluster info without blocking
+ */
+ public synchronized Cluster fetch() {
+ return this.cluster;
+ }
+
+ /**
+ * Add the topic to maintain in the metadata
+ */
+ public synchronized void add(String topic) {
+ topics.add(topic);
+ }
+
+ /**
+ * The next time to update the cluster info is the maximum of the time the current info will expire and the time the
+ * current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time
+ * is now
+ */
+ public synchronized long timeToNextUpdate(long nowMs) {
+ long timeToExpire = needUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0);
+ long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
+ return Math.max(timeToExpire, timeToAllowUpdate);
+ }
+
+ /**
+ * Request an update of the current cluster metadata info, return the current version before the update
+ */
+ public synchronized int requestUpdate() {
+ this.needUpdate = true;
+ return this.version;
+ }
+
+ /**
+ * Wait for metadata update until the current version is larger than the last version we know of
+ */
+ public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) {
+ if (maxWaitMs < 0) {
+ throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
+ }
+ long begin = System.currentTimeMillis();
+ long remainingWaitMs = maxWaitMs;
+ while (this.version <= lastVersion) {
+ try {
+ if (remainingWaitMs != 0) {
+ wait(remainingWaitMs);
+ }
+ } catch (InterruptedException e) { /* this is fine */
+ }
+ long elapsed = System.currentTimeMillis() - begin;
+ if (elapsed >= maxWaitMs)
+ throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
+ remainingWaitMs = maxWaitMs - elapsed;
+ }
+ }
+
+ /**
+ * Add one or more topics to maintain metadata for
+ */
+ public synchronized void addTopics(String... topics) {
+ for (String topic : topics)
+ this.topics.add(topic);
+ requestUpdate();
+ }
+
+ /**
+ * Get the list of topics we are currently maintaining metadata for
+ */
+ public synchronized Set<String> topics() {
+ return new HashSet<String>(this.topics);
+ }
+
+ /**
+ * Update the cluster metadata
+ */
+ public synchronized void update(Cluster cluster, long now) {
+ this.needUpdate = false;
+ this.lastRefreshMs = now;
+ this.version += 1;
+ this.cluster = cluster;
+ notifyAll();
+ log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
+ }
+
+ /**
+ * @return The current metadata version
+ */
+ public synchronized int version() {
+ return this.version;
+ }
+
+ /**
+ * The last time metadata was updated.
+ */
+ public synchronized long lastUpdate() {
+ return this.lastRefreshMs;
+ }
+
+ /**
+ * The metadata refresh backoff in ms
+ */
+ public long refreshBackoff() {
+ return refreshBackoffMs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 5950191..fef90a0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -19,7 +19,6 @@ import java.util.List;
import java.util.Random;
import java.util.Set;
-import org.apache.kafka.clients.producer.internals.Metadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.network.NetworkReceive;
@@ -199,7 +198,7 @@ public class NetworkClient implements KafkaClient {
// should we update our metadata?
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
- long waitForMetadataFetch = (this.metadataFetchInProgress ? Integer.MAX_VALUE : 0);
+ long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
// if there is no node available to connect, back off refreshing metadata
long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
waitForMetadataFetch);
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java
index 072cc2e..7548a9b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java
@@ -1,3 +1,15 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
package org.apache.kafka.clients.consumer;
public enum CommitType {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 6d4ff7c..5fb2100 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -28,7 +28,7 @@ import org.apache.kafka.common.config.ConfigDef.Type;
* The consumer configuration keys
*/
public class ConsumerConfig extends AbstractConfig {
- private static final ConfigDef config;
+ private static final ConfigDef CONFIG;
/*
* NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS
@@ -154,7 +154,7 @@ public class ConsumerConfig extends AbstractConfig {
private static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>Deserializer</code> interface.";
static {
- config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
+ CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
Importance.HIGH,
CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
@@ -277,11 +277,11 @@ public class ConsumerConfig extends AbstractConfig {
}
ConsumerConfig(Map<? extends Object, ? extends Object> props) {
- super(config, props);
+ super(CONFIG, props);
}
public static void main(String[] args) {
- System.out.println(config.toHtmlTable());
+ System.out.println(CONFIG.toHtmlTable());
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
index 416d703..305ec8e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -26,11 +26,11 @@ import org.apache.kafka.common.utils.AbstractIterator;
* particular topic. There is one for every topic returned by a
* {@link Consumer#poll(long)} operation.
*/
-public class ConsumerRecords<K,V> implements Iterable<ConsumerRecord<K,V>> {
+public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
- private final Map<TopicPartition, List<ConsumerRecord<K,V>>> records;
+ private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
- public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K,V>>> records) {
+ public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records) {
this.records = records;
}
@@ -39,8 +39,8 @@ public class ConsumerRecords<K,V> implements Iterable<ConsumerRecord<K,V>> {
*
* @param partition The partition to get records for
*/
- public Iterable<ConsumerRecord<K,V>> records(TopicPartition partition) {
- List<ConsumerRecord<K,V>> recs = this.records.get(partition);
+ public Iterable<ConsumerRecord<K, V>> records(TopicPartition partition) {
+ List<ConsumerRecord<K, V>> recs = this.records.get(partition);
if (recs == null)
return Collections.emptyList();
else
@@ -50,20 +50,20 @@ public class ConsumerRecords<K,V> implements Iterable<ConsumerRecord<K,V>> {
/**
* Get just the records for the given topic
*/
- public Iterable<ConsumerRecord<K,V>> records(String topic) {
+ public Iterable<ConsumerRecord<K, V>> records(String topic) {
if (topic == null)
throw new IllegalArgumentException("Topic must be non-null.");
- List<List<ConsumerRecord<K,V>>> recs = new ArrayList<List<ConsumerRecord<K,V>>>();
- for (Map.Entry<TopicPartition, List<ConsumerRecord<K,V>>> entry : records.entrySet()) {
+ List<List<ConsumerRecord<K, V>>> recs = new ArrayList<List<ConsumerRecord<K, V>>>();
+ for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : records.entrySet()) {
if (entry.getKey().equals(topic))
recs.add(entry.getValue());
}
- return new ConcatenatedIterable<K,V>(recs);
+ return new ConcatenatedIterable<K, V>(recs);
}
@Override
- public Iterator<ConsumerRecord<K,V>> iterator() {
- return new ConcatenatedIterable<K,V>(records.values()).iterator();
+ public Iterator<ConsumerRecord<K, V>> iterator() {
+ return new ConcatenatedIterable<K, V>(records.values()).iterator();
}
/**
@@ -71,26 +71,26 @@ public class ConsumerRecords<K,V> implements Iterable<ConsumerRecord<K,V>> {
*/
public int count() {
int count = 0;
- for(List<ConsumerRecord<K,V>> recs: this.records.values())
+ for (List<ConsumerRecord<K, V>> recs: this.records.values())
count += recs.size();
return count;
}
- private static class ConcatenatedIterable<K,V> implements Iterable<ConsumerRecord<K,V>> {
+ private static class ConcatenatedIterable<K, V> implements Iterable<ConsumerRecord<K, V>> {
- private final Iterable<? extends Iterable<ConsumerRecord<K,V>>> iterables;
+ private final Iterable<? extends Iterable<ConsumerRecord<K, V>>> iterables;
- public ConcatenatedIterable(Iterable<? extends Iterable<ConsumerRecord<K,V>>> iterables) {
+ public ConcatenatedIterable(Iterable<? extends Iterable<ConsumerRecord<K, V>>> iterables) {
this.iterables = iterables;
}
@Override
- public Iterator<ConsumerRecord<K,V>> iterator() {
- return new AbstractIterator<ConsumerRecord<K,V>>() {
- Iterator<? extends Iterable<ConsumerRecord<K,V>>> iters = iterables.iterator();
- Iterator<ConsumerRecord<K,V>> current;
+ public Iterator<ConsumerRecord<K, V>> iterator() {
+ return new AbstractIterator<ConsumerRecord<K, V>>() {
+ Iterator<? extends Iterable<ConsumerRecord<K, V>>> iters = iterables.iterator();
+ Iterator<ConsumerRecord<K, V>> current;
- public ConsumerRecord<K,V> makeNext() {
+ public ConsumerRecord<K, V> makeNext() {
if (current == null || !current.hasNext()) {
if (iters.hasNext())
current = iters.next().iterator();
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 300c551..09a6f11 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -30,13 +30,13 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.ConnectionState;
+import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.internals.Heartbeat;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.internals.Metadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
@@ -78,7 +78,6 @@ import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestSend;
-import org.apache.kafka.common.utils.ClientUtils;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -380,7 +379,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
private static final long LATEST_OFFSET_TIMESTAMP = -1L;
- private static final AtomicInteger consumerAutoId = new AtomicInteger(1);
+ private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private final Time time;
private final ConsumerMetrics metrics;
@@ -547,15 +546,15 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
TimeUnit.MILLISECONDS);
String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
String jmxPrefix = "kafka.consumer";
- if(clientId .length() <= 0)
- clientId = "consumer-" + consumerAutoId.getAndIncrement();
+ if (clientId.length() <= 0)
+ clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
reporters.add(new JmxReporter(jmxPrefix));
Metrics metrics = new Metrics(metricConfig, reporters, time);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG));
- List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), 0);
String metricsGroup = "consumer";
@@ -1554,23 +1553,31 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
"The maximum lag for any partition in this window",
tags), new Max());
+ Measurable numParts =
+ new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return subscriptions.assignedPartitions().size();
+ }
+ };
metrics.addMetric(new MetricName("assigned-partitions",
metricsGroup,
"The number of partitions currently assigned to this consumer",
- tags), new Measurable() {
- public double measure(MetricConfig config, long now) {
- return subscriptions.assignedPartitions().size();
- }
- });
-
+ tags),
+ numParts);
+
+
+ Measurable lastHeartbeat =
+ new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
+ }
+ };
metrics.addMetric(new MetricName("last-heartbeat-seconds-ago",
metricsGroup,
"The number of seconds since the last controller heartbeat",
- tags), new Measurable() {
- public double measure(MetricConfig config, long now) {
- return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
- }
- });
+ tags),
+
+ lastHeartbeat);
}
public void recordTopicFetchMetrics(String topic, int bytes, int records) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
index d9483ec..ee0751e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
@@ -1,3 +1,15 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
package org.apache.kafka.clients.consumer.internals;
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
index 7e57a39..c06ab3a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
@@ -22,9 +22,9 @@ import org.apache.kafka.common.TopicPartition;
public class NoOpConsumerRebalanceCallback implements ConsumerRebalanceCallback {
@Override
- public void onPartitionsAssigned(Consumer<?,?> consumer, Collection<TopicPartition> partitions) {}
+ public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
@Override
- public void onPartitionsRevoked(Consumer<?,?> consumer, Collection<TopicPartition> partitions) {}
+ public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 71ce20d..d41d306 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -1,3 +1,15 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
package org.apache.kafka.clients.consumer.internals;
import java.util.HashMap;
@@ -58,8 +70,8 @@ public class SubscriptionState {
throw new IllegalStateException("Topic " + topic + " was never subscribed to.");
this.subscribedTopics.remove(topic);
this.needsPartitionAssignment = true;
- for(TopicPartition tp: assignedPartitions())
- if(topic.equals(tp.topic()))
+ for (TopicPartition tp: assignedPartitions())
+ if (topic.equals(tp.topic()))
clearPartition(tp);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index ebc4c53..1fd6917 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -19,8 +19,9 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
-import org.apache.kafka.clients.producer.internals.Metadata;
import org.apache.kafka.clients.producer.internals.Partitioner;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
@@ -45,7 +46,6 @@ import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.ClientUtils;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
@@ -60,9 +60,10 @@ import org.slf4j.LoggerFactory;
* The producer manages a single background thread that does I/O as well as a TCP connection to each of the brokers it
* needs to communicate with. Failure to close the producer after use will leak these resources.
*/
-public class KafkaProducer<K,V> implements Producer<K,V> {
+public class KafkaProducer<K, V> implements Producer<K, V> {
private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
+ private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private final Partitioner partitioner;
private final int maxRequestSize;
@@ -79,7 +80,6 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
private final Serializer<K> keySerializer;
private final Serializer<V> valueSerializer;
private final ProducerConfig producerConfig;
- private static final AtomicInteger producerAutoId = new AtomicInteger(1);
/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -154,6 +154,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
return newProperties;
}
+ @SuppressWarnings("unchecked")
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
log.trace("Starting the Kafka producer");
this.producerConfig = config;
@@ -162,8 +163,8 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
TimeUnit.MILLISECONDS);
String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
- if(clientId.length() <= 0)
- clientId = "producer-" + producerAutoId.getAndIncrement();
+ if (clientId.length() <= 0)
+ clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
String jmxPrefix = "kafka.producer";
List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
@@ -216,16 +217,16 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.keySerializer.configure(config.originals(), true);
- }
- else
+ } else {
this.keySerializer = keySerializer;
+ }
if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.valueSerializer.configure(config.originals(), false);
- }
- else
+ } else {
this.valueSerializer = valueSerializer;
+ }
config.logUnused();
log.debug("Kafka producer started");
@@ -244,7 +245,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
* @param record The record to be sent
*/
@Override
- public Future<RecordMetadata> send(ProducerRecord<K,V> record) {
+ public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}
@@ -309,7 +310,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
* indicates no callback)
*/
@Override
- public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback) {
+ public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
try {
// first make sure the metadata for the topic is available
waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs);
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index 6b2471f..17fe541 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -31,7 +31,7 @@ import org.apache.kafka.common.MetricName;
* @see KafkaProducer
* @see MockProducer
*/
-public interface Producer<K,V> extends Closeable {
+public interface Producer<K, V> extends Closeable {
/**
* Send the given record asynchronously and return a future which will eventually contain the response information.
@@ -39,12 +39,12 @@ public interface Producer<K,V> extends Closeable {
* @param record The record to send
* @return A future which will eventually contain the response information
*/
- public Future<RecordMetadata> send(ProducerRecord<K,V> record);
+ public Future<RecordMetadata> send(ProducerRecord<K, V> record);
/**
* Send a record and invoke the given callback when the record has been acknowledged by the server
*/
- public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback);
+ public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
/**
* Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 9a43d66..122375c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -35,7 +35,7 @@ public class ProducerConfig extends AbstractConfig {
* CHANGE WILL BREAK USER CODE.
*/
- private static final ConfigDef config;
+ private static final ConfigDef CONFIG;
/** <code>bootstrap.servers</code> */
public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
@@ -167,13 +167,13 @@ public class ProducerConfig extends AbstractConfig {
private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface.";
static {
- config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
+ CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
.define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
.define(ACKS_CONFIG,
Type.STRING,
"1",
- in("all","-1", "0", "1"),
+ in("all", "-1", "0", "1"),
Importance.HIGH,
ACKS_DOC)
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
@@ -218,11 +218,11 @@ public class ProducerConfig extends AbstractConfig {
}
ProducerConfig(Map<? extends Object, ? extends Object> props) {
- super(config, props);
+ super(CONFIG, props);
}
public static void main(String[] args) {
- System.out.println(config.toHtmlTable());
+ System.out.println(CONFIG.toHtmlTable());
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index 8d4156d..4cb1e50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -80,11 +80,11 @@ public final class BufferPool {
this.time = time;
this.waitTime = this.metrics.sensor("bufferpool-wait-time");
MetricName metricName = new MetricName("bufferpool-wait-ratio",
- metricGrpName,
- "The fraction of time an appender waits for space allocation.",
- metricTags);
+ metricGrpName,
+ "The fraction of time an appender waits for space allocation.",
+ metricTags);
this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
- }
+ }
/**
* Allocate a buffer of the given size. This method blocks if there is not enough memory and the buffer pool
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
deleted file mode 100644
index 3aff624..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.producer.internals;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A class encapsulating some of the logic around metadata.
- * <p>
- * This class is shared by the client thread (for partitioning) and the background sender thread.
- *
- * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a
- * topic we don't have any metadata for it will trigger a metadata update.
- */
-public final class Metadata {
-
- private static final Logger log = LoggerFactory.getLogger(Metadata.class);
-
- private final long refreshBackoffMs;
- private final long metadataExpireMs;
- private int version;
- private long lastRefreshMs;
- private Cluster cluster;
- private boolean needUpdate;
- private final Set<String> topics;
-
- /**
- * Create a metadata instance with reasonable defaults
- */
- public Metadata() {
- this(100L, 60 * 60 * 1000L);
- }
-
- /**
- * Create a new Metadata instance
- * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
- * polling
- * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
- */
- public Metadata(long refreshBackoffMs, long metadataExpireMs) {
- this.refreshBackoffMs = refreshBackoffMs;
- this.metadataExpireMs = metadataExpireMs;
- this.lastRefreshMs = 0L;
- this.version = 0;
- this.cluster = Cluster.empty();
- this.needUpdate = false;
- this.topics = new HashSet<String>();
- }
-
- /**
- * Get the current cluster info without blocking
- */
- public synchronized Cluster fetch() {
- return this.cluster;
- }
-
- /**
- * Add the topic to maintain in the metadata
- */
- public synchronized void add(String topic) {
- topics.add(topic);
- }
-
- /**
- * The next time to update the cluster info is the maximum of the time the current info will expire and the time the
- * current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time
- * is now
- */
- public synchronized long timeToNextUpdate(long nowMs) {
- long timeToExpire = needUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0);
- long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
- return Math.max(timeToExpire, timeToAllowUpdate);
- }
-
- /**
- * Request an update of the current cluster metadata info, return the current version before the update
- */
- public synchronized int requestUpdate() {
- this.needUpdate = true;
- return this.version;
- }
-
- /**
- * Wait for metadata update until the current version is larger than the last version we know of
- */
- public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) {
- if (maxWaitMs < 0) {
- throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
- }
- long begin = System.currentTimeMillis();
- long remainingWaitMs = maxWaitMs;
- while (this.version <= lastVersion) {
- try {
- if (remainingWaitMs != 0) {
- wait(remainingWaitMs);
- }
- } catch (InterruptedException e) { /* this is fine */
- }
- long elapsed = System.currentTimeMillis() - begin;
- if (elapsed >= maxWaitMs)
- throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
- remainingWaitMs = maxWaitMs - elapsed;
- }
- }
-
- /**
- * Add one or more topics to maintain metadata for
- */
- public synchronized void addTopics(String... topics) {
- for (String topic : topics)
- this.topics.add(topic);
- requestUpdate();
- }
-
- /**
- * Get the list of topics we are currently maintaining metadata for
- */
- public synchronized Set<String> topics() {
- return new HashSet<String>(this.topics);
- }
-
- /**
- * Update the cluster metadata
- */
- public synchronized void update(Cluster cluster, long now) {
- this.needUpdate = false;
- this.lastRefreshMs = now;
- this.version += 1;
- this.cluster = cluster;
- notifyAll();
- log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
- }
-
- /**
- * @return The current metadata version
- */
- public synchronized int version() {
- return this.version;
- }
-
- /**
- * The last time metadata was updated.
- */
- public synchronized long lastUpdate() {
- return this.lastRefreshMs;
- }
-
- /**
- * The metadata refresh backoff in ms
- */
- public long refreshBackoff() {
- return refreshBackoffMs;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
index b70ece7..8e5855d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
@@ -19,7 +19,6 @@ package org.apache.kafka.clients.producer.internals;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 50889e4..ecfe214 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -102,26 +102,27 @@ public final class RecordAccumulator {
private void registerMetrics(Metrics metrics, String metricGrpName, Map<String, String> metricTags) {
MetricName metricName = new MetricName("waiting-threads", metricGrpName, "The number of user threads blocked waiting for buffer memory to enqueue their records", metricTags);
- metrics.addMetric(metricName,
- new Measurable() {
- public double measure(MetricConfig config, long now) {
- return free.queued();
- }
- });
+ Measurable waitingThreads = new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return free.queued();
+ }
+ };
+ metrics.addMetric(metricName, waitingThreads);
+
metricName = new MetricName("buffer-total-bytes", metricGrpName, "The maximum amount of buffer memory the client can use (whether or not it is currently used).", metricTags);
- metrics.addMetric(metricName,
- new Measurable() {
- public double measure(MetricConfig config, long now) {
- return free.totalMemory();
- }
- });
+ Measurable totalBytes = new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return free.totalMemory();
+ }
+ };
+ metrics.addMetric(metricName, totalBytes);
metricName = new MetricName("buffer-available-bytes", metricGrpName, "The total amount of buffer memory that is not being used (either unallocated or in the free list).", metricTags);
- metrics.addMetric(metricName,
- new Measurable() {
- public double measure(MetricConfig config, long now) {
- return free.availableMemory();
- }
- });
+ Measurable availableBytes = new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return free.availableMemory();
+ }
+ };
+ metrics.addMetric(metricName, availableBytes);
}
/**
@@ -228,8 +229,7 @@ public final class RecordAccumulator {
boolean sendable = full || expired || exhausted || closed;
if (sendable && !backingOff) {
readyNodes.add(leader);
- }
- else {
+ } else {
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 8726809..ed9c63a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
index 689bae9..13f4d59 100644
--- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
+++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
@@ -45,12 +45,12 @@ public class ProducerPerformance {
}
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[],byte[]>(props);
+ KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props);
/* setup perf test */
byte[] payload = new byte[recordSize];
Arrays.fill(payload, (byte) 1);
- ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>(topicName, payload);
+ ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicName, payload);
long sleepTime = NS_PER_SEC / throughput;
long sleepDeficitNs = 0;
Stats stats = new Stats(numRecords, 5000);
@@ -66,8 +66,8 @@ public class ProducerPerformance {
* and then make up the whole deficit in one longer sleep.
*/
if (throughput > 0) {
- float elapsed = (sendStart - start)/1000.f;
- if (elapsed > 0 && i/elapsed > throughput) {
+ float elapsed = (sendStart - start) / 1000.f;
+ if (elapsed > 0 && i / elapsed > throughput) {
sleepDeficitNs += sleepTime;
if (sleepDeficitNs >= MIN_SLEEP_NS) {
long sleepMs = sleepDeficitNs / 1000000;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index d7ccbcd..8fcd291 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -40,8 +40,8 @@ public final class Cluster {
this.nodes = Collections.unmodifiableList(copy);
this.nodesById = new HashMap<Integer, Node>();
- for(Node node: nodes)
- this.nodesById.put(node.id(), node);
+ for (Node node: nodes)
+ this.nodesById.put(node.id(), node);
// index the partitions by topic/partition for quick lookup
this.partitionsByTopicPartition = new HashMap<TopicPartition, PartitionInfo>(partitions.size());
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/MetricName.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/MetricName.java b/clients/src/main/java/org/apache/kafka/common/MetricName.java
index 7e977e9..04b4a09 100644
--- a/clients/src/main/java/org/apache/kafka/common/MetricName.java
+++ b/clients/src/main/java/org/apache/kafka/common/MetricName.java
@@ -90,8 +90,8 @@ public final class MetricName {
throw new IllegalArgumentException("keyValue needs to be specified in paris");
Map<String, String> tags = new HashMap<String, String>();
- for (int i=0; i<(keyValue.length / 2); i++)
- tags.put(keyValue[i], keyValue[i+1]);
+ for (int i = 0; i < keyValue.length / 2; i++)
+ tags.put(keyValue[i], keyValue[i + 1]);
return tags;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
index 28562f9..321da8a 100644
--- a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
+++ b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
@@ -72,7 +72,7 @@ public class PartitionInfo {
return String.format("Partition(topic = %s, partition = %d, leader = %s, replicas = %s, isr = %s",
topic,
partition,
- leader == null? "none" : leader.id(),
+ leader == null ? "none" : leader.id(),
fmtNodeIds(replicas),
fmtNodeIds(inSyncReplicas));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 38ce10b..8523333 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -53,6 +53,7 @@ public class ConfigDef {
/**
* Returns unmodifiable set of properties names defined in this {@linkplain ConfigDef}
+ *
* @return new unmodifiable {@link Set} instance containing the keys
*/
public Set<String> names() {
@@ -61,6 +62,7 @@ public class ConfigDef {
/**
* Define a new configuration
+ *
* @param name The name of the config parameter
* @param type The type of the config
* @param defaultValue The default value to use if this config isn't present
@@ -69,16 +71,23 @@ public class ConfigDef {
* @param documentation The documentation string for the config
* @return This ConfigDef so you can chain calls
*/
- public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) {
+ public ConfigDef define(String name,
+ Type type,
+ Object defaultValue,
+ Validator validator,
+ Importance importance,
+ String documentation) {
if (configKeys.containsKey(name))
throw new ConfigException("Configuration " + name + " is defined twice.");
- Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
+ Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE
+ : parseType(name, defaultValue, type);
configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation));
return this;
}
/**
* Define a new configuration with no special validation logic
+ *
* @param name The name of the config parameter
* @param type The type of the config
* @param defaultValue The default value to use if this config isn't present
@@ -92,6 +101,7 @@ public class ConfigDef {
/**
* Define a required parameter with no default value
+ *
* @param name The name of the config parameter
* @param type The type of the config
* @param validator A validator to use in checking the correctness of the config
@@ -105,6 +115,7 @@ public class ConfigDef {
/**
* Define a required parameter with no default value and no special validation logic
+ *
* @param name The name of the config parameter
* @param type The type of the config
* @param importance The importance of this config: is this something you will likely need to change.
@@ -120,6 +131,7 @@ public class ConfigDef {
* that the keys of the map are strings, but the values can either be strings or they may already be of the
* appropriate type (int, string, etc). This will work equally well with either java.util.Properties instances or a
* programmatically constructed map.
+ *
* @param props The configs to parse and validate
* @return Parsed and validated configs. The key will be the config name and the value will be the value parsed into
* the appropriate type (int, string, etc)
@@ -132,7 +144,8 @@ public class ConfigDef {
if (props.containsKey(key.name))
value = parseType(key.name, props.get(key.name), key.type);
else if (key.defaultValue == NO_DEFAULT_VALUE)
- throw new ConfigException("Missing required configuration \"" + key.name + "\" which has no default value.");
+ throw new ConfigException("Missing required configuration \"" + key.name
+ + "\" which has no default value.");
else
value = key.defaultValue;
if (key.validator != null)
@@ -144,6 +157,7 @@ public class ConfigDef {
/**
* Parse a value according to its expected type.
+ *
* @param name The config name
* @param value The config value
* @param type The expected type
@@ -157,14 +171,13 @@ public class ConfigDef {
switch (type) {
case BOOLEAN:
if (value instanceof String) {
- if (trimmed.equalsIgnoreCase("true"))
- return true;
- else if (trimmed.equalsIgnoreCase("false"))
- return false;
- else
- throw new ConfigException(name, value, "Expected value to be either true or false");
- }
- else if (value instanceof Boolean)
+ if (trimmed.equalsIgnoreCase("true"))
+ return true;
+ else if (trimmed.equalsIgnoreCase("false"))
+ return false;
+ else
+ throw new ConfigException(name, value, "Expected value to be either true or false");
+ } else if (value instanceof Boolean)
return value;
else
throw new ConfigException(name, value, "Expected value to be either true or false");
@@ -172,7 +185,8 @@ public class ConfigDef {
if (value instanceof String)
return trimmed;
else
- throw new ConfigException(name, value, "Expected value to be a string, but it was a " + value.getClass().getName());
+ throw new ConfigException(name, value, "Expected value to be a string, but it was a "
+ + value.getClass().getName());
case INT:
if (value instanceof Integer) {
return (Integer) value;
@@ -256,6 +270,7 @@ public class ConfigDef {
/**
* A numeric range that checks only the lower bound
+ *
* @param min The minimum acceptable value
*/
public static Range atLeast(Number min) {
@@ -287,32 +302,30 @@ public class ConfigDef {
}
}
- public static class ValidString implements Validator {
- List<String> validStrings;
+ public static class ValidString implements Validator {
+ private final List<String> validStrings;
- private ValidString(List<String> validStrings) {
- this.validStrings = validStrings;
- }
+ private ValidString(List<String> validStrings) {
+ this.validStrings = validStrings;
+ }
- public static ValidString in(String... validStrings) {
- return new ValidString(Arrays.asList(validStrings));
- }
+ public static ValidString in(String... validStrings) {
+ return new ValidString(Arrays.asList(validStrings));
+ }
- @Override
- public void ensureValid(String name, Object o) {
- String s = (String) o;
- if (!validStrings.contains(s)) {
- throw new ConfigException(name,o,"String must be one of: " + Utils.join(validStrings, ", "));
- }
+ @Override
+ public void ensureValid(String name, Object o) {
+ String s = (String) o;
+ if (!validStrings.contains(s))
+ throw new ConfigException(name, o, "String must be one of: " + Utils.join(validStrings, ", "));
+ }
- }
+ public String toString() {
+ return "[" + Utils.join(validStrings, ", ") + "]";
+ }
- public String toString() {
- return "[" + Utils.join(validStrings, ", ") + "]";
}
- }
-
private static class ConfigKey {
public final String name;
public final Type type;
@@ -321,7 +334,12 @@ public class ConfigDef {
public final Validator validator;
public final Importance importance;
- public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) {
+ public ConfigKey(String name,
+ Type type,
+ Object defaultValue,
+ Validator validator,
+ Importance importance,
+ String documentation) {
super();
this.name = name;
this.type = type;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
index 75c80a9..a6107b8 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -17,27 +17,26 @@
package org.apache.kafka.common.errors;
/**
- * Number of insync replicas for the partition is lower than min.insync.replicas
- * This exception is raised when the low ISR size is discovered *after* the message
- * was already appended to the log. Producer retries will cause duplicates.
+ * Number of insync replicas for the partition is lower than min.insync.replicas This exception is raised when the low
+ * ISR size is discovered *after* the message was already appended to the log. Producer retries will cause duplicates.
*/
public class NotEnoughReplicasAfterAppendException extends RetriableException {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- public NotEnoughReplicasAfterAppendException() {
- super();
- }
+ public NotEnoughReplicasAfterAppendException() {
+ super();
+ }
- public NotEnoughReplicasAfterAppendException(String message, Throwable cause) {
- super(message,cause);
- }
+ public NotEnoughReplicasAfterAppendException(String message, Throwable cause) {
+ super(message, cause);
+ }
- public NotEnoughReplicasAfterAppendException(String message) {
- super(message);
- }
+ public NotEnoughReplicasAfterAppendException(String message) {
+ super(message);
+ }
- public NotEnoughReplicasAfterAppendException(Throwable cause) {
- super(cause);
- }
+ public NotEnoughReplicasAfterAppendException(Throwable cause) {
+ super(cause);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
index 486d515..1573227 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -20,21 +20,21 @@ package org.apache.kafka.common.errors;
* Number of insync replicas for the partition is lower than min.insync.replicas
*/
public class NotEnoughReplicasException extends RetriableException {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- public NotEnoughReplicasException() {
- super();
- }
+ public NotEnoughReplicasException() {
+ super();
+ }
- public NotEnoughReplicasException(String message, Throwable cause) {
- super(message, cause);
- }
+ public NotEnoughReplicasException(String message, Throwable cause) {
+ super(message, cause);
+ }
- public NotEnoughReplicasException(String message) {
- super(message);
- }
+ public NotEnoughReplicasException(String message) {
+ super(message);
+ }
- public NotEnoughReplicasException(Throwable cause) {
- super(cause);
- }
+ public NotEnoughReplicasException(Throwable cause) {
+ super(cause);
+ }
}
[5/6] kafka git commit: KAFKA-1915: Add checkstyle for java code.
Posted by jk...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java
deleted file mode 100644
index 5be72fe..0000000
--- a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.common.message;
-
-import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
-import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.LZ4_MAX_HEADER_LENGTH;
-import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.MAGIC;
-
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.BD;
-import org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.FLG;
-import org.apache.kafka.common.utils.Utils;
-
-import net.jpountz.lz4.LZ4Exception;
-import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.lz4.LZ4SafeDecompressor;
-import net.jpountz.xxhash.XXHash32;
-import net.jpountz.xxhash.XXHashFactory;
-
-/**
- * A partial implementation of the v1.4.1 LZ4 Frame format.
- *
- * @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4 Framing Format Spec</a>
- */
-public final class KafkaLZ4BlockInputStream extends FilterInputStream {
-
- public static final String PREMATURE_EOS = "Stream ended prematurely";
- public static final String NOT_SUPPORTED = "Stream unsupported";
- public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch";
- public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted";
-
- private final LZ4SafeDecompressor decompressor;
- private final XXHash32 checksum;
- private final byte[] buffer;
- private final byte[] compressedBuffer;
- private final int maxBlockSize;
- private FLG flg;
- private BD bd;
- private int bufferOffset;
- private int bufferSize;
- private boolean finished;
-
- /**
- * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
- *
- * @param in The stream to decompress
- * @throws IOException
- */
- public KafkaLZ4BlockInputStream(InputStream in) throws IOException {
- super(in);
- decompressor = LZ4Factory.fastestInstance().safeDecompressor();
- checksum = XXHashFactory.fastestInstance().hash32();
- readHeader();
- maxBlockSize = bd.getBlockMaximumSize();
- buffer = new byte[maxBlockSize];
- compressedBuffer = new byte[maxBlockSize];
- bufferOffset = 0;
- bufferSize = 0;
- finished = false;
- }
-
- /**
- * Reads the magic number and frame descriptor from the underlying {@link InputStream}.
- *
- * @throws IOException
- */
- private void readHeader() throws IOException {
- byte[] header = new byte[LZ4_MAX_HEADER_LENGTH];
-
- // read first 6 bytes into buffer to check magic and FLG/BD descriptor flags
- bufferOffset = 6;
- if (in.read(header, 0, bufferOffset) != bufferOffset) {
- throw new IOException(PREMATURE_EOS);
- }
-
- if (MAGIC != Utils.readUnsignedIntLE(header, bufferOffset-6)) {
- throw new IOException(NOT_SUPPORTED);
- }
- flg = FLG.fromByte(header[bufferOffset-2]);
- bd = BD.fromByte(header[bufferOffset-1]);
- // TODO read uncompressed content size, update flg.validate()
- // TODO read dictionary id, update flg.validate()
-
- // check stream descriptor hash
- byte hash = (byte) ((checksum.hash(header, 0, bufferOffset, 0) >> 8) & 0xFF);
- header[bufferOffset++] = (byte) in.read();
- if (hash != header[bufferOffset-1]) {
- throw new IOException(DESCRIPTOR_HASH_MISMATCH);
- }
- }
-
- /**
- * Decompresses (if necessary) buffered data, optionally computes and validates a XXHash32 checksum,
- * and writes the result to a buffer.
- *
- * @throws IOException
- */
- private void readBlock() throws IOException {
- int blockSize = Utils.readUnsignedIntLE(in);
-
- // Check for EndMark
- if (blockSize == 0) {
- finished = true;
- // TODO implement content checksum, update flg.validate()
- return;
- } else if (blockSize > maxBlockSize) {
- throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize));
- }
-
- boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0;
- byte[] bufferToRead;
- if (compressed) {
- bufferToRead = compressedBuffer;
- } else {
- blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK;
- bufferToRead = buffer;
- bufferSize = blockSize;
- }
-
- if (in.read(bufferToRead, 0, blockSize) != blockSize) {
- throw new IOException(PREMATURE_EOS);
- }
-
- // verify checksum
- if (flg.isBlockChecksumSet() && Utils.readUnsignedIntLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) {
- throw new IOException(BLOCK_HASH_MISMATCH);
- }
-
- if (compressed) {
- try {
- bufferSize = decompressor.decompress(compressedBuffer, 0, blockSize, buffer, 0, maxBlockSize);
- } catch (LZ4Exception e) {
- throw new IOException(e);
- }
- }
-
- bufferOffset = 0;
- }
-
- @Override
- public int read() throws IOException {
- if (finished) {
- return -1;
- }
- if (available() == 0) {
- readBlock();
- }
- if (finished) {
- return -1;
- }
- int value = buffer[bufferOffset++] & 0xFF;
-
- return value;
- }
-
- @Override
- public int read(byte b[], int off, int len) throws IOException {
- net.jpountz.util.Utils.checkRange(b, off, len);
- if (finished) {
- return -1;
- }
- if (available() == 0) {
- readBlock();
- }
- if (finished) {
- return -1;
- }
- len = Math.min(len, available());
- System.arraycopy(buffer, bufferOffset, b, off, len);
- bufferOffset += len;
- return len;
- }
-
- @Override
- public long skip(long n) throws IOException {
- if (finished) {
- return 0;
- }
- if (available() == 0) {
- readBlock();
- }
- if (finished) {
- return 0;
- }
- n = Math.min(n, available());
- bufferOffset += n;
- return n;
- }
-
- @Override
- public int available() throws IOException {
- return bufferSize - bufferOffset;
- }
-
- @Override
- public void close() throws IOException {
- in.close();
- }
-
- @Override
- public synchronized void mark(int readlimit) {
- throw new RuntimeException("mark not supported");
- }
-
- @Override
- public synchronized void reset() throws IOException {
- throw new RuntimeException("reset not supported");
- }
-
- @Override
- public boolean markSupported() {
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
deleted file mode 100644
index e5b9e43..0000000
--- a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
+++ /dev/null
@@ -1,387 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.common.message;
-
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.kafka.common.utils.Utils;
-
-import net.jpountz.lz4.LZ4Compressor;
-import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.xxhash.XXHash32;
-import net.jpountz.xxhash.XXHashFactory;
-
-/**
- * A partial implementation of the v1.4.1 LZ4 Frame format.
- *
- * @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4 Framing Format Spec</a>
- */
-public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
-
- public static final int MAGIC = 0x184D2204;
- public static final int LZ4_MAX_HEADER_LENGTH = 19;
- public static final int LZ4_FRAME_INCOMPRESSIBLE_MASK = 0x80000000;
-
- public static final String CLOSED_STREAM = "The stream is already closed";
-
- public static final int BLOCKSIZE_64KB = 4;
- public static final int BLOCKSIZE_256KB = 5;
- public static final int BLOCKSIZE_1MB = 6;
- public static final int BLOCKSIZE_4MB = 7;
-
- private final LZ4Compressor compressor;
- private final XXHash32 checksum;
- private final FLG flg;
- private final BD bd;
- private final byte[] buffer;
- private final byte[] compressedBuffer;
- private final int maxBlockSize;
- private int bufferOffset;
- private boolean finished;
-
- /**
- * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
- *
- * @param out The output stream to compress
- * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other values will generate an exception
- * @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and appended to the stream for every block of data
- * @throws IOException
- */
- public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum) throws IOException {
- super(out);
- compressor = LZ4Factory.fastestInstance().fastCompressor();
- checksum = XXHashFactory.fastestInstance().hash32();
- bd = new BD(blockSize);
- flg = new FLG(blockChecksum);
- bufferOffset = 0;
- maxBlockSize = bd.getBlockMaximumSize();
- buffer = new byte[maxBlockSize];
- compressedBuffer = new byte[compressor.maxCompressedLength(maxBlockSize)];
- finished = false;
- writeHeader();
- }
-
- /**
- * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
- *
- * @param out The stream to compress
- * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other values will generate an exception
- * @throws IOException
- */
- public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize) throws IOException {
- this(out, blockSize, false);
- }
-
- /**
- * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
- *
- * @param out The output stream to compress
- * @throws IOException
- */
- public KafkaLZ4BlockOutputStream(OutputStream out) throws IOException {
- this(out, BLOCKSIZE_64KB);
- }
-
- /**
- * Writes the magic number and frame descriptor to the underlying {@link OutputStream}.
- *
- * @throws IOException
- */
- private void writeHeader() throws IOException {
- Utils.writeUnsignedIntLE(buffer, 0, MAGIC);
- bufferOffset = 4;
- buffer[bufferOffset++] = flg.toByte();
- buffer[bufferOffset++] = bd.toByte();
- // TODO write uncompressed content size, update flg.validate()
- // TODO write dictionary id, update flg.validate()
- // compute checksum on all descriptor fields
- int hash = (checksum.hash(buffer, 0, bufferOffset, 0) >> 8) & 0xFF;
- buffer[bufferOffset++] = (byte) hash;
- // write out frame descriptor
- out.write(buffer, 0, bufferOffset);
- bufferOffset = 0;
- }
-
- /**
- * Compresses buffered data, optionally computes an XXHash32 checksum, and writes
- * the result to the underlying {@link OutputStream}.
- *
- * @throws IOException
- */
- private void writeBlock() throws IOException {
- if (bufferOffset == 0) {
- return;
- }
-
- int compressedLength = compressor.compress(buffer, 0, bufferOffset, compressedBuffer, 0);
- byte[] bufferToWrite = compressedBuffer;
- int compressMethod = 0;
-
- // Store block uncompressed if compressed length is greater (incompressible)
- if (compressedLength >= bufferOffset) {
- bufferToWrite = buffer;
- compressedLength = bufferOffset;
- compressMethod = LZ4_FRAME_INCOMPRESSIBLE_MASK;
- }
-
- // Write content
- Utils.writeUnsignedIntLE(out, compressedLength | compressMethod);
- out.write(bufferToWrite, 0, compressedLength);
-
- // Calculate and write block checksum
- if (flg.isBlockChecksumSet()) {
- int hash = checksum.hash(bufferToWrite, 0, compressedLength, 0);
- Utils.writeUnsignedIntLE(out, hash);
- }
- bufferOffset = 0;
- }
-
- /**
- * Similar to the {@link #writeBlock()} method. Writes a 0-length block
- * (without block checksum) to signal the end of the block stream.
- *
- * @throws IOException
- */
- private void writeEndMark() throws IOException {
- Utils.writeUnsignedIntLE(out, 0);
- // TODO implement content checksum, update flg.validate()
- finished = true;
- }
-
- @Override
- public void write(int b) throws IOException {
- ensureNotFinished();
- if (bufferOffset == maxBlockSize) {
- writeBlock();
- }
- buffer[bufferOffset++] = (byte) b;
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- net.jpountz.util.Utils.checkRange(b, off, len);
- ensureNotFinished();
-
- int bufferRemainingLength = maxBlockSize - bufferOffset;
- // while b will fill the buffer
- while (len > bufferRemainingLength) {
- // fill remaining space in buffer
- System.arraycopy(b, off, buffer, bufferOffset, bufferRemainingLength);
- bufferOffset = maxBlockSize;
- writeBlock();
- // compute new offset and length
- off += bufferRemainingLength;
- len -= bufferRemainingLength;
- bufferRemainingLength = maxBlockSize;
- }
-
- System.arraycopy(b, off, buffer, bufferOffset, len);
- bufferOffset += len;
- }
-
- @Override
- public void flush() throws IOException {
- if (!finished) {
- writeBlock();
- }
- if (out != null) {
- out.flush();
- }
- }
-
- /**
- * A simple state check to ensure the stream is still open.
- */
- private void ensureNotFinished() {
- if (finished) {
- throw new IllegalStateException(CLOSED_STREAM);
- }
- }
-
- @Override
- public void close() throws IOException {
- if (!finished) {
- writeEndMark();
- flush();
- finished = true;
- }
- if (out != null) {
- out.close();
- out = null;
- }
- }
-
- public static class FLG {
-
- private static final int VERSION = 1;
-
- private final int presetDictionary;
- private final int reserved1;
- private final int contentChecksum;
- private final int contentSize;
- private final int blockChecksum;
- private final int blockIndependence;
- private final int version;
-
- public FLG() {
- this(false);
- }
-
- public FLG(boolean blockChecksum) {
- this(0, 0, 0, 0, blockChecksum ? 1 : 0, 1, VERSION);
- }
-
- private FLG(int presetDictionary, int reserved1, int contentChecksum,
- int contentSize, int blockChecksum, int blockIndependence, int version) {
- this.presetDictionary = presetDictionary;
- this.reserved1 = reserved1;
- this.contentChecksum = contentChecksum;
- this.contentSize = contentSize;
- this.blockChecksum = blockChecksum;
- this.blockIndependence = blockIndependence;
- this.version = version;
- validate();
- }
-
- public static FLG fromByte(byte flg) {
- int presetDictionary = (flg >>> 0) & 1;
- int reserved1 = (flg >>> 1) & 1;
- int contentChecksum = (flg >>> 2) & 1;
- int contentSize = (flg >>> 3) & 1;
- int blockChecksum = (flg >>> 4) & 1;
- int blockIndependence = (flg >>> 5) & 1;
- int version = (flg >>> 6) & 3;
-
- return new FLG(presetDictionary, reserved1, contentChecksum,
- contentSize, blockChecksum, blockIndependence, version);
- }
-
- public byte toByte() {
- return (byte) (
- ((presetDictionary & 1) << 0)
- | ((reserved1 & 1) << 1)
- | ((contentChecksum & 1) << 2)
- | ((contentSize & 1) << 3)
- | ((blockChecksum & 1) << 4)
- | ((blockIndependence & 1) << 5)
- | ((version & 3) << 6) );
- }
-
- private void validate() {
- if (presetDictionary != 0) {
- throw new RuntimeException("Preset dictionary is unsupported");
- }
- if (reserved1 != 0) {
- throw new RuntimeException("Reserved1 field must be 0");
- }
- if (contentChecksum != 0) {
- throw new RuntimeException("Content checksum is unsupported");
- }
- if (contentSize != 0) {
- throw new RuntimeException("Content size is unsupported");
- }
- if (blockIndependence != 1) {
- throw new RuntimeException("Dependent block stream is unsupported");
- }
- if (version != VERSION) {
- throw new RuntimeException(String.format("Version %d is unsupported", version));
- }
- }
-
- public boolean isPresetDictionarySet() {
- return presetDictionary == 1;
- }
-
- public boolean isContentChecksumSet() {
- return contentChecksum == 1;
- }
-
- public boolean isContentSizeSet() {
- return contentSize == 1;
- }
-
- public boolean isBlockChecksumSet() {
- return blockChecksum == 1;
- }
-
- public boolean isBlockIndependenceSet() {
- return blockIndependence == 1;
- }
-
- public int getVersion() {
- return version;
- }
- }
-
- public static class BD {
-
- private final int reserved2;
- private final int blockSizeValue;
- private final int reserved3;
-
- public BD() {
- this(0, BLOCKSIZE_64KB, 0);
- }
-
- public BD(int blockSizeValue) {
- this(0, blockSizeValue, 0);
- }
-
- private BD(int reserved2, int blockSizeValue, int reserved3) {
- this.reserved2 = reserved2;
- this.blockSizeValue = blockSizeValue;
- this.reserved3 = reserved3;
- validate();
- }
-
- public static BD fromByte(byte bd) {
- int reserved2 = (bd >>> 0) & 15;
- int blockMaximumSize = (bd >>> 4) & 7;
- int reserved3 = (bd >>> 7) & 1;
-
- return new BD(reserved2, blockMaximumSize, reserved3);
- }
-
- private void validate() {
- if (reserved2 != 0) {
- throw new RuntimeException("Reserved2 field must be 0");
- }
- if (blockSizeValue < 4 || blockSizeValue > 7) {
- throw new RuntimeException("Block size value must be between 4 and 7");
- }
- if (reserved3 != 0) {
- throw new RuntimeException("Reserved3 field must be 0");
- }
- }
-
- // 2^(2n+8)
- public int getBlockMaximumSize() {
- return (1 << ((2 * blockSizeValue) + 8));
- }
-
- public byte toByte() {
- return (byte) (
- ((reserved2 & 15) << 0)
- | ((blockSizeValue & 7) << 4)
- | ((reserved3 & 1) << 7) );
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
index 9c20538..6b9590c 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
@@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory;
public class JmxReporter implements MetricsReporter {
private static final Logger log = LoggerFactory.getLogger(JmxReporter.class);
- private static final Object lock = new Object();
+ private static final Object LOCK = new Object();
private String prefix;
private final Map<String, KafkaMbean> mbeans = new HashMap<String, KafkaMbean>();
@@ -58,12 +58,11 @@ public class JmxReporter implements MetricsReporter {
}
@Override
- public void configure(Map<String, ?> configs) {
- }
+ public void configure(Map<String, ?> configs) {}
@Override
public void init(List<KafkaMetric> metrics) {
- synchronized (lock) {
+ synchronized (LOCK) {
for (KafkaMetric metric : metrics)
addAttribute(metric);
for (KafkaMbean mbean : mbeans.values())
@@ -73,7 +72,7 @@ public class JmxReporter implements MetricsReporter {
@Override
public void metricChange(KafkaMetric metric) {
- synchronized (lock) {
+ synchronized (LOCK) {
KafkaMbean mbean = addAttribute(metric);
reregister(mbean);
}
@@ -86,36 +85,35 @@ public class JmxReporter implements MetricsReporter {
if (!this.mbeans.containsKey(mBeanName))
mbeans.put(mBeanName, new KafkaMbean(mBeanName));
KafkaMbean mbean = this.mbeans.get(mBeanName);
- mbean.setAttribute(metricName.name() , metric);
+ mbean.setAttribute(metricName.name(), metric);
return mbean;
} catch (JMException e) {
throw new KafkaException("Error creating mbean attribute for metricName :" + metric.metricName(), e);
}
}
- /**
- * @param metricName
- * @return standard JMX MBean name in the following format
- * domainName:type=metricType,key1=val1,key2=val2
- */
- private String getMBeanName(MetricName metricName) {
- StringBuilder mBeanName = new StringBuilder();
- mBeanName.append(prefix);
- mBeanName.append(":type=");
- mBeanName.append(metricName.group());
- for (Map.Entry<String, String> entry : metricName.tags().entrySet()) {
- if(entry.getKey().length() <= 0 || entry.getValue().length() <= 0)
- continue;
- mBeanName.append(",");
- mBeanName.append(entry.getKey());
- mBeanName.append("=");
- mBeanName.append(entry.getValue());
+ /**
+ * @param metricName
+ * @return standard JMX MBean name in the following format domainName:type=metricType,key1=val1,key2=val2
+ */
+ private String getMBeanName(MetricName metricName) {
+ StringBuilder mBeanName = new StringBuilder();
+ mBeanName.append(prefix);
+ mBeanName.append(":type=");
+ mBeanName.append(metricName.group());
+ for (Map.Entry<String, String> entry : metricName.tags().entrySet()) {
+ if (entry.getKey().length() <= 0 || entry.getValue().length() <= 0)
+ continue;
+ mBeanName.append(",");
+ mBeanName.append(entry.getKey());
+ mBeanName.append("=");
+ mBeanName.append(entry.getValue());
+ }
+ return mBeanName.toString();
}
- return mBeanName.toString();
- }
public void close() {
- synchronized (lock) {
+ synchronized (LOCK) {
for (KafkaMbean mbean : this.mbeans.values())
unregister(mbean);
}
@@ -185,7 +183,12 @@ public class JmxReporter implements MetricsReporter {
for (Map.Entry<String, KafkaMetric> entry : this.metrics.entrySet()) {
String attribute = entry.getKey();
KafkaMetric metric = entry.getValue();
- attrs[i] = new MBeanAttributeInfo(attribute, double.class.getName(), metric.metricName().description(), true, false, false);
+ attrs[i] = new MBeanAttributeInfo(attribute,
+ double.class.getName(),
+ metric.metricName().description(),
+ true,
+ false,
+ false);
i += 1;
}
return new MBeanInfo(this.getClass().getName(), "", attrs, null, null, null);
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index e53cfaa..ca823fd 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -147,7 +147,7 @@ public final class Sensor {
* @param stat The statistic to keep
*/
public void add(MetricName metricName, MeasurableStat stat) {
- add(metricName, stat, null);
+ add(metricName, stat, null);
}
/**
@@ -157,11 +157,11 @@ public final class Sensor {
* @param config A special configuration for this metric. If null use the sensor default configuration.
*/
public synchronized void add(MetricName metricName, MeasurableStat stat, MetricConfig config) {
- KafkaMetric metric = new KafkaMetric(new Object(),
- Utils.notNull(metricName),
- Utils.notNull(stat),
- config == null ? this.config : config,
- time);
+ KafkaMetric metric = new KafkaMetric(new Object(),
+ Utils.notNull(metricName),
+ Utils.notNull(stat),
+ config == null ? this.config : config,
+ time);
this.registry.registerMetric(metric);
this.metrics.add(metric);
this.stats.add(stat);
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
index a5838b3..98429da 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
@@ -71,7 +71,7 @@ public class Rate implements MeasurableStat {
case MILLISECONDS:
return time;
case SECONDS:
- return time / (1000.0);
+ return time / 1000.0;
case MINUTES:
return time / (60.0 * 1000.0);
case HOURS:
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
index dcc639a..fc0d168 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
@@ -50,7 +50,7 @@ public class NetworkReceive implements Receive {
@Override
public ByteBuffer[] reify() {
- return new ByteBuffer[] { this.buffer };
+ return new ByteBuffer[] {this.buffer};
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index e18a769..6baad93 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -14,7 +14,6 @@ package org.apache.kafka.common.network;
import java.io.EOFException;
import java.io.IOException;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.CancelledKeyException;
@@ -275,7 +274,7 @@ public class Selector implements Selectable {
}
} catch (IOException e) {
String desc = socketDescription(channel);
- if(e instanceof EOFException)
+ if (e instanceof EOFException)
log.info("Connection {} disconnected", desc);
else
log.warn("Error in I/O with connection to {}", desc, e);
@@ -290,9 +289,9 @@ public class Selector implements Selectable {
private String socketDescription(SocketChannel channel) {
Socket socket = channel.socket();
- if(socket == null)
+ if (socket == null)
return "[unconnected socket]";
- else if(socket.getInetAddress() != null)
+ else if (socket.getInetAddress() != null)
return socket.getInetAddress().toString();
else
return socket.getLocalAddress().toString();
@@ -525,7 +524,7 @@ public class Selector implements Selectable {
String metricGrpName = metricGrpPrefix + "-node-metrics";
Map<String, String> tags = new LinkedHashMap<String, String>(metricTags);
- tags.put("node-id", "node-"+node);
+ tags.put("node-id", "node-" + node);
nodeRequest = this.metrics.sensor(nodeRequestName);
MetricName metricName = new MetricName("outgoing-byte-rate", metricGrpName, tags);
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 109fc96..07aba71 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -16,10 +16,6 @@
*/
package org.apache.kafka.common.protocol;
-
-import java.util.ArrayList;
-import java.util.List;
-
/**
* Identifiers for all the Kafka APIs
*/
@@ -37,16 +33,18 @@ public enum ApiKeys {
HEARTBEAT(12, "heartbeat");
private static ApiKeys[] codeToType;
- public static int MAX_API_KEY = -1;
+ public static final int MAX_API_KEY;
static {
+ int maxKey = -1;
for (ApiKeys key : ApiKeys.values()) {
- MAX_API_KEY = Math.max(MAX_API_KEY, key.id);
+ maxKey = Math.max(maxKey, key.id);
}
- codeToType = new ApiKeys[MAX_API_KEY+1];
+ codeToType = new ApiKeys[maxKey + 1];
for (ApiKeys key : ApiKeys.values()) {
codeToType[key.id] = key;
}
+ MAX_API_KEY = maxKey;
}
/** the perminant and immutable id of an API--this can't change ever */
[4/6] kafka git commit: KAFKA-1915: Add checkstyle for java code.
Posted by jk...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 7517b87..101f382 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -28,346 +28,347 @@ import org.apache.kafka.common.protocol.types.Schema;
public class Protocol {
- public static Schema REQUEST_HEADER = new Schema(new Field("api_key", INT16, "The id of the request type."),
- new Field("api_version", INT16, "The version of the API."),
- new Field("correlation_id",
- INT32,
- "A user-supplied integer value that will be passed back with the response"),
- new Field("client_id",
- STRING,
- "A user specified identifier for the client making the request."));
-
- public static Schema RESPONSE_HEADER = new Schema(new Field("correlation_id",
- INT32,
- "The user-supplied value passed in with the request"));
+ public static final Schema REQUEST_HEADER = new Schema(new Field("api_key", INT16, "The id of the request type."),
+ new Field("api_version", INT16, "The version of the API."),
+ new Field("correlation_id",
+ INT32,
+ "A user-supplied integer value that will be passed back with the response"),
+ new Field("client_id",
+ STRING,
+ "A user specified identifier for the client making the request."));
+
+ public static final Schema RESPONSE_HEADER = new Schema(new Field("correlation_id",
+ INT32,
+ "The user-supplied value passed in with the request"));
/* Metadata api */
- public static Schema METADATA_REQUEST_V0 = new Schema(new Field("topics",
- new ArrayOf(STRING),
- "An array of topics to fetch metadata for. If no topics are specified fetch metadtata for all topics."));
+ public static final Schema METADATA_REQUEST_V0 = new Schema(new Field("topics",
+ new ArrayOf(STRING),
+ "An array of topics to fetch metadata for. If no topics are specified fetch metadtata for all topics."));
- public static Schema BROKER = new Schema(new Field("node_id", INT32, "The broker id."),
- new Field("host", STRING, "The hostname of the broker."),
- new Field("port", INT32, "The port on which the broker accepts requests."));
+ public static final Schema BROKER = new Schema(new Field("node_id", INT32, "The broker id."),
+ new Field("host", STRING, "The hostname of the broker."),
+ new Field("port",
+ INT32,
+ "The port on which the broker accepts requests."));
- public static Schema PARTITION_METADATA_V0 = new Schema(new Field("partition_error_code",
- INT16,
- "The error code for the partition, if any."),
- new Field("partition_id", INT32, "The id of the partition."),
- new Field("leader",
- INT32,
- "The id of the broker acting as leader for this partition."),
- new Field("replicas",
- new ArrayOf(INT32),
- "The set of all nodes that host this partition."),
- new Field("isr",
- new ArrayOf(INT32),
- "The set of nodes that are in sync with the leader for this partition."));
-
- public static Schema TOPIC_METADATA_V0 = new Schema(new Field("topic_error_code", INT16, "The error code for the given topic."),
- new Field("topic", STRING, "The name of the topic"),
- new Field("partition_metadata",
- new ArrayOf(PARTITION_METADATA_V0),
- "Metadata for each partition of the topic."));
-
- public static Schema METADATA_RESPONSE_V0 = new Schema(new Field("brokers",
- new ArrayOf(BROKER),
- "Host and port information for all brokers."),
- new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V0)));
-
- public static Schema[] METADATA_REQUEST = new Schema[] { METADATA_REQUEST_V0 };
- public static Schema[] METADATA_RESPONSE = new Schema[] { METADATA_RESPONSE_V0 };
+ public static final Schema PARTITION_METADATA_V0 = new Schema(new Field("partition_error_code",
+ INT16,
+ "The error code for the partition, if any."),
+ new Field("partition_id",
+ INT32,
+ "The id of the partition."),
+ new Field("leader",
+ INT32,
+ "The id of the broker acting as leader for this partition."),
+ new Field("replicas",
+ new ArrayOf(INT32),
+ "The set of all nodes that host this partition."),
+ new Field("isr",
+ new ArrayOf(INT32),
+ "The set of nodes that are in sync with the leader for this partition."));
+
+ public static final Schema TOPIC_METADATA_V0 = new Schema(new Field("topic_error_code",
+ INT16,
+ "The error code for the given topic."),
+ new Field("topic", STRING, "The name of the topic"),
+ new Field("partition_metadata",
+ new ArrayOf(PARTITION_METADATA_V0),
+ "Metadata for each partition of the topic."));
+
+ public static final Schema METADATA_RESPONSE_V0 = new Schema(new Field("brokers",
+ new ArrayOf(BROKER),
+ "Host and port information for all brokers."),
+ new Field("topic_metadata",
+ new ArrayOf(TOPIC_METADATA_V0)));
+
+ public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0};
+ public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0};
/* Produce api */
- public static Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", STRING),
- new Field("data", new ArrayOf(new Schema(new Field("partition", INT32),
- new Field("record_set", BYTES)))));
-
- public static Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks",
- INT16,
- "The number of nodes that should replicate the produce before returning. -1 indicates the full ISR."),
- new Field("timeout", INT32, "The time to await a response in ms."),
- new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
-
- public static Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses",
- new ArrayOf(new Schema(new Field("topic", STRING),
- new Field("partition_responses",
- new ArrayOf(new Schema(new Field("partition",
- INT32),
- new Field("error_code",
- INT16),
- new Field("base_offset",
- INT64))))))));
-
- public static Schema[] PRODUCE_REQUEST = new Schema[] { PRODUCE_REQUEST_V0 };
- public static Schema[] PRODUCE_RESPONSE = new Schema[] { PRODUCE_RESPONSE_V0 };
-
- /* Offset commit api */
- public static Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("offset",
- INT64,
- "Message offset to be committed."),
- new Field("timestamp",
- INT64,
- "Timestamp of the commit"),
- new Field("metadata",
- STRING,
- "Any associated metadata the client wants to keep."));
-
- public static Schema OFFSET_COMMIT_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
- STRING,
- "Topic to commit."),
- new Field("partitions",
- new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0),
- "Partitions to commit offsets."));
-
- public static Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id",
- STRING,
- "The consumer group id."),
- new Field("topics",
- new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
- "Topics to commit offsets."));
-
- public static Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(new Field("group_id",
- STRING,
- "The consumer group id."),
- new Field("group_generation_id",
+ public static final Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", STRING),
+ new Field("data",
+ new ArrayOf(new Schema(new Field("partition",
+ INT32),
+ new Field("record_set",
+ BYTES)))));
+
+ public static final Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks",
+ INT16,
+ "The number of nodes that should replicate the produce before returning. -1 indicates the full ISR."),
+ new Field("timeout",
INT32,
- "The generation of the consumer group."),
- new Field("consumer_id",
- STRING,
- "The consumer id assigned by the group coordinator."),
- new Field("topics",
- new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
- "Topics to commit offsets."));
-
- public static Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("error_code",
- INT16));
-
- public static Schema OFFSET_COMMIT_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
- new Field("partition_responses",
- new ArrayOf(OFFSET_COMMIT_RESPONSE_PARTITION_V0)));
-
- public static Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses",
- new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
-
- public static Schema[] OFFSET_COMMIT_REQUEST = new Schema[] { OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1 };
- /* The response types for both V0 and V1 of OFFSET_COMMIT_REQUEST are the same. */
- public static Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] { OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0};
+ "The time to await a response in ms."),
+ new Field("topic_data",
+ new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
+
+ public static final Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses",
+ new ArrayOf(new Schema(new Field("topic",
+ STRING),
+ new Field("partition_responses",
+ new ArrayOf(new Schema(new Field("partition",
+ INT32),
+ new Field("error_code",
+ INT16),
+ new Field("base_offset",
+ INT64))))))));
+
+ public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0};
+ public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0};
- /* Offset fetch api */
- public static Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."));
+ /* Offset commit api */
+ public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+ INT32,
+ "Topic partition id."),
+ new Field("offset",
+ INT64,
+ "Message offset to be committed."),
+ new Field("timestamp",
+ INT64,
+ "Timestamp of the commit"),
+ new Field("metadata",
+ STRING,
+ "Any associated metadata the client wants to keep."));
+
+ public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+ STRING,
+ "Topic to commit."),
+ new Field("partitions",
+ new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0),
+ "Partitions to commit offsets."));
+
+ public static final Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id",
+ STRING,
+ "The consumer group id."),
+ new Field("topics",
+ new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
+ "Topics to commit offsets."));
- public static Schema OFFSET_FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+ public static final Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(new Field("group_id",
STRING,
- "Topic to fetch offset."),
- new Field("partitions",
- new ArrayOf(OFFSET_FETCH_REQUEST_PARTITION_V0),
- "Partitions to fetch offsets."));
-
- public static Schema OFFSET_FETCH_REQUEST_V0 = new Schema(new Field("group_id",
- STRING,
- "The consumer group id."),
- new Field("topics",
- new ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0),
- "Topics to fetch offsets."));
-
- public static Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("offset",
- INT64,
- "Last committed message offset."),
- new Field("metadata",
- STRING,
- "Any associated metadata the client wants to keep."),
- new Field("error_code",
- INT16));
+ "The consumer group id."),
+ new Field("group_generation_id",
+ INT32,
+ "The generation of the consumer group."),
+ new Field("consumer_id",
+ STRING,
+ "The consumer id assigned by the group coordinator."),
+ new Field("topics",
+ new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
+ "Topics to commit offsets."));
+
+ public static final Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+ INT32,
+ "Topic partition id."),
+ new Field("error_code", INT16));
- public static Schema OFFSET_FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
- new Field("partition_responses",
- new ArrayOf(OFFSET_FETCH_RESPONSE_PARTITION_V0)));
+ public static final Schema OFFSET_COMMIT_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+ new Field("partition_responses",
+ new ArrayOf(OFFSET_COMMIT_RESPONSE_PARTITION_V0)));
- public static Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(new Field("responses",
- new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)));
+ public static final Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses",
+ new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
- public static Schema[] OFFSET_FETCH_REQUEST = new Schema[] { OFFSET_FETCH_REQUEST_V0 };
- public static Schema[] OFFSET_FETCH_RESPONSE = new Schema[] { OFFSET_FETCH_RESPONSE_V0 };
+ public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1};
+ /* The response types for both V0 and V1 of OFFSET_COMMIT_REQUEST are the same. */
+ public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0};
+
+ /* Offset fetch api */
+ public static final Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+ INT32,
+ "Topic partition id."));
+
+ public static final Schema OFFSET_FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+ STRING,
+ "Topic to fetch offset."),
+ new Field("partitions",
+ new ArrayOf(OFFSET_FETCH_REQUEST_PARTITION_V0),
+ "Partitions to fetch offsets."));
+
+ public static final Schema OFFSET_FETCH_REQUEST_V0 = new Schema(new Field("group_id",
+ STRING,
+ "The consumer group id."),
+ new Field("topics",
+ new ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0),
+ "Topics to fetch offsets."));
+
+ public static final Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+ INT32,
+ "Topic partition id."),
+ new Field("offset",
+ INT64,
+ "Last committed message offset."),
+ new Field("metadata",
+ STRING,
+ "Any associated metadata the client wants to keep."),
+ new Field("error_code", INT16));
+
+ public static final Schema OFFSET_FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+ new Field("partition_responses",
+ new ArrayOf(OFFSET_FETCH_RESPONSE_PARTITION_V0)));
+
+ public static final Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(new Field("responses",
+ new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)));
+
+ public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0};
+ public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0};
/* List offset api */
- public static Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+ public static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+ INT32,
+ "Topic partition id."),
+ new Field("timestamp", INT64, "Timestamp."),
+ new Field("max_num_offsets",
+ INT32,
+ "Maximum offsets to return."));
+
+ public static final Schema LIST_OFFSET_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+ STRING,
+ "Topic to list offset."),
+ new Field("partitions",
+ new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V0),
+ "Partitions to list offset."));
+
+ public static final Schema LIST_OFFSET_REQUEST_V0 = new Schema(new Field("replica_id",
+ INT32,
+ "Broker id of the follower. For normal consumers, use -1."),
+ new Field("topics",
+ new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V0),
+ "Topics to list offsets."));
+
+ public static final Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+ INT32,
+ "Topic partition id."),
+ new Field("error_code", INT16),
+ new Field("offsets",
+ new ArrayOf(INT64),
+ "A list of offsets."));
+
+ public static final Schema LIST_OFFSET_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+ new Field("partition_responses",
+ new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V0)));
+
+ public static final Schema LIST_OFFSET_RESPONSE_V0 = new Schema(new Field("responses",
+ new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V0)));
+
+ public static final Schema[] LIST_OFFSET_REQUEST = new Schema[] {LIST_OFFSET_REQUEST_V0};
+ public static final Schema[] LIST_OFFSET_RESPONSE = new Schema[] {LIST_OFFSET_RESPONSE_V0};
+
+ /* Fetch api */
+ public static final Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
INT32,
"Topic partition id."),
- new Field("timestamp",
+ new Field("fetch_offset",
INT64,
- "Timestamp."),
- new Field("max_num_offsets",
+ "Message offset."),
+ new Field("max_bytes",
INT32,
- "Maximum offsets to return."));
+ "Maximum bytes to fetch."));
- public static Schema LIST_OFFSET_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
- STRING,
- "Topic to list offset."),
+ public static final Schema FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic", STRING, "Topic to fetch."),
new Field("partitions",
- new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V0),
- "Partitions to list offset."));
+ new ArrayOf(FETCH_REQUEST_PARTITION_V0),
+ "Partitions to fetch."));
- public static Schema LIST_OFFSET_REQUEST_V0 = new Schema(new Field("replica_id",
+ public static final Schema FETCH_REQUEST_V0 = new Schema(new Field("replica_id",
INT32,
"Broker id of the follower. For normal consumers, use -1."),
+ new Field("max_wait_time",
+ INT32,
+ "Maximum time in ms to wait for the response."),
+ new Field("min_bytes",
+ INT32,
+ "Minimum bytes to accumulate in the response."),
new Field("topics",
- new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V0),
- "Topics to list offsets."));
+ new ArrayOf(FETCH_REQUEST_TOPIC_V0),
+ "Topics to fetch."));
- public static Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+ public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
INT32,
"Topic partition id."),
- new Field("error_code",
- INT16),
- new Field("offsets",
- new ArrayOf(INT64),
- "A list of offsets."));
+ new Field("error_code", INT16),
+ new Field("high_watermark",
+ INT64,
+ "Last committed offset."),
+ new Field("record_set", BYTES));
- public static Schema LIST_OFFSET_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+ public static final Schema FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
new Field("partition_responses",
- new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V0)));
-
- public static Schema LIST_OFFSET_RESPONSE_V0 = new Schema(new Field("responses",
- new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V0)));
-
- public static Schema[] LIST_OFFSET_REQUEST = new Schema[] { LIST_OFFSET_REQUEST_V0 };
- public static Schema[] LIST_OFFSET_RESPONSE = new Schema[] { LIST_OFFSET_RESPONSE_V0 };
-
- /* Fetch api */
- public static Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("fetch_offset",
- INT64,
- "Message offset."),
- new Field("max_bytes",
- INT32,
- "Maximum bytes to fetch."));
-
- public static Schema FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
- STRING,
- "Topic to fetch."),
- new Field("partitions",
- new ArrayOf(FETCH_REQUEST_PARTITION_V0),
- "Partitions to fetch."));
-
- public static Schema FETCH_REQUEST_V0 = new Schema(new Field("replica_id",
- INT32,
- "Broker id of the follower. For normal consumers, use -1."),
- new Field("max_wait_time",
- INT32,
- "Maximum time in ms to wait for the response."),
- new Field("min_bytes",
- INT32,
- "Minimum bytes to accumulate in the response."),
- new Field("topics",
- new ArrayOf(FETCH_REQUEST_TOPIC_V0),
- "Topics to fetch."));
-
- public static Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("error_code",
- INT16),
- new Field("high_watermark",
- INT64,
- "Last committed offset."),
- new Field("record_set", BYTES));
-
- public static Schema FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
- new Field("partition_responses",
- new ArrayOf(FETCH_RESPONSE_PARTITION_V0)));
+ new ArrayOf(FETCH_RESPONSE_PARTITION_V0)));
- public static Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses",
- new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
+ public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses",
+ new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
- public static Schema[] FETCH_REQUEST = new Schema[] { FETCH_REQUEST_V0 };
- public static Schema[] FETCH_RESPONSE = new Schema[] { FETCH_RESPONSE_V0 };
+ public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0};
+ public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0};
/* Consumer metadata api */
- public static Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id",
- STRING,
- "The consumer group id."));
+ public static final Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id",
+ STRING,
+ "The consumer group id."));
- public static Schema CONSUMER_METADATA_RESPONSE_V0 = new Schema(new Field("error_code",
- INT16),
- new Field("coordinator",
- BROKER,
- "Host and port information for the coordinator for a consumer group."));
+ public static final Schema CONSUMER_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
+ new Field("coordinator",
+ BROKER,
+ "Host and port information for the coordinator for a consumer group."));
- public static Schema[] CONSUMER_METADATA_REQUEST = new Schema[] { CONSUMER_METADATA_REQUEST_V0 };
- public static Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] { CONSUMER_METADATA_RESPONSE_V0 };
+ public static final Schema[] CONSUMER_METADATA_REQUEST = new Schema[] {CONSUMER_METADATA_REQUEST_V0};
+ public static final Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] {CONSUMER_METADATA_RESPONSE_V0};
/* Join group api */
- public static Schema JOIN_GROUP_REQUEST_V0 = new Schema(new Field("group_id",
- STRING,
- "The consumer group id."),
- new Field("session_timeout",
- INT32,
- "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."),
- new Field("topics",
- new ArrayOf(STRING),
- "An array of topics to subscribe to."),
- new Field("consumer_id",
- STRING,
- "The assigned consumer id or an empty string for a new consumer."),
- new Field("partition_assignment_strategy",
- STRING,
- "The strategy for the coordinator to assign partitions."));
-
- public static Schema JOIN_GROUP_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
- new Field("partitions", new ArrayOf(INT32)));
- public static Schema JOIN_GROUP_RESPONSE_V0 = new Schema(new Field("error_code",
- INT16),
- new Field("group_generation_id",
- INT32,
- "The generation of the consumer group."),
- new Field("consumer_id",
- STRING,
- "The consumer id assigned by the group coordinator."),
- new Field("assigned_partitions",
- new ArrayOf(JOIN_GROUP_RESPONSE_TOPIC_V0)));
+ public static final Schema JOIN_GROUP_REQUEST_V0 = new Schema(new Field("group_id",
+ STRING,
+ "The consumer group id."),
+ new Field("session_timeout",
+ INT32,
+ "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."),
+ new Field("topics",
+ new ArrayOf(STRING),
+ "An array of topics to subscribe to."),
+ new Field("consumer_id",
+ STRING,
+ "The assigned consumer id or an empty string for a new consumer."),
+ new Field("partition_assignment_strategy",
+ STRING,
+ "The strategy for the coordinator to assign partitions."));
+
+ public static final Schema JOIN_GROUP_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+ new Field("partitions", new ArrayOf(INT32)));
+ public static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
+ new Field("group_generation_id",
+ INT32,
+ "The generation of the consumer group."),
+ new Field("consumer_id",
+ STRING,
+ "The consumer id assigned by the group coordinator."),
+ new Field("assigned_partitions",
+ new ArrayOf(JOIN_GROUP_RESPONSE_TOPIC_V0)));
- public static Schema[] JOIN_GROUP_REQUEST = new Schema[] { JOIN_GROUP_REQUEST_V0 };
- public static Schema[] JOIN_GROUP_RESPONSE = new Schema[] { JOIN_GROUP_RESPONSE_V0 };
+ public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0};
+ public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0};
/* Heartbeat api */
- public static Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id",
- STRING,
- "The consumer group id."),
- new Field("group_generation_id",
- INT32,
- "The generation of the consumer group."),
- new Field("consumer_id",
- STRING,
- "The consumer id assigned by the group coordinator."));
+ public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The consumer group id."),
+ new Field("group_generation_id",
+ INT32,
+ "The generation of the consumer group."),
+ new Field("consumer_id",
+ STRING,
+ "The consumer id assigned by the group coordinator."));
- public static Schema HEARTBEAT_RESPONSE_V0 = new Schema(new Field("error_code",
- INT16));
+ public static final Schema HEARTBEAT_RESPONSE_V0 = new Schema(new Field("error_code", INT16));
- public static Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
- public static Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
+ public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
+ public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
/* an array of all requests and responses with all schema versions */
- public static Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
- public static Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
+ public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
+ public static final Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
/* the latest version of each api */
- public static short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1];
+ public static final short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1];
static {
REQUESTS[ApiKeys.PRODUCE.id] = PRODUCE_REQUEST;
@@ -401,11 +402,8 @@ public class Protocol {
/* sanity check that we have the same number of request and response versions for each api */
for (ApiKeys api : ApiKeys.values())
if (REQUESTS[api.id].length != RESPONSES[api.id].length)
- throw new IllegalStateException(REQUESTS[api.id].length + " request versions for api "
- + api.name
- + " but "
- + RESPONSES[api.id].length
- + " response versions.");
+ throw new IllegalStateException(REQUESTS[api.id].length + " request versions for api " + api.name
+ + " but " + RESPONSES[api.id].length + " response versions.");
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index ee1f78f..ff89f0e 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -245,7 +245,7 @@ public class Struct {
public ByteBuffer[] toBytes() {
ByteBuffer buffer = ByteBuffer.allocate(sizeOf());
writeTo(buffer);
- return new ByteBuffer[] { buffer };
+ return new ByteBuffer[] {buffer};
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
index c7bd2f8..1c9fbaa 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
@@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
*/
public class ByteBufferOutputStream extends OutputStream {
- private static float REALLOCATION_FACTOR = 1.1f;
+ private static final float REALLOCATION_FACTOR = 1.1f;
private ByteBuffer buffer;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index d684e68..e570b29 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -34,16 +34,15 @@ public class Compressor {
static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024;
- private static float[] typeToRate;
- private static int MAX_TYPE_ID = -1;
+ private static final float[] TYPE_TO_RATE;
static {
+ int maxTypeId = -1;
+ for (CompressionType type : CompressionType.values())
+ maxTypeId = Math.max(maxTypeId, type.id);
+ TYPE_TO_RATE = new float[maxTypeId + 1];
for (CompressionType type : CompressionType.values()) {
- MAX_TYPE_ID = Math.max(MAX_TYPE_ID, type.id);
- }
- typeToRate = new float[MAX_TYPE_ID+1];
- for (CompressionType type : CompressionType.values()) {
- typeToRate[type.id] = type.rate;
+ TYPE_TO_RATE[type.id] = type.rate;
}
}
@@ -118,7 +117,7 @@ public class Compressor {
// update the compression ratio
float compressionRate = (float) buffer.position() / this.writtenUncompressed;
- typeToRate[type.id] = typeToRate[type.id] * COMPRESSION_RATE_DAMPING_FACTOR +
+ TYPE_TO_RATE[type.id] = TYPE_TO_RATE[type.id] * COMPRESSION_RATE_DAMPING_FACTOR +
compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR);
}
}
@@ -192,7 +191,7 @@ public class Compressor {
return bufferStream.buffer().position();
} else {
// estimate the written bytes to the underlying byte buffer based on uncompressed written bytes
- return (long) (writtenUncompressed * typeToRate[type.id] * COMPRESSION_RATE_ESTIMATION_FACTOR);
+ return (long) (writtenUncompressed * TYPE_TO_RATE[type.id] * COMPRESSION_RATE_ESTIMATION_FACTOR);
}
}
@@ -209,8 +208,8 @@ public class Compressor {
// dynamically load the snappy class to avoid runtime dependency
// on snappy if we are not using it
try {
- Class SnappyOutputStream = Class.forName("org.xerial.snappy.SnappyOutputStream");
- OutputStream stream = (OutputStream) SnappyOutputStream.getConstructor(OutputStream.class, Integer.TYPE)
+ Class<?> outputStreamClass = Class.forName("org.xerial.snappy.SnappyOutputStream");
+ OutputStream stream = (OutputStream) outputStreamClass.getConstructor(OutputStream.class, Integer.TYPE)
.newInstance(buffer, bufferSize);
return new DataOutputStream(stream);
} catch (Exception e) {
@@ -218,7 +217,7 @@ public class Compressor {
}
case LZ4:
try {
- Class outputStreamClass = Class.forName("org.apache.kafka.common.message.KafkaLZ4BlockOutputStream");
+ Class<?> outputStreamClass = Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockOutputStream");
OutputStream stream = (OutputStream) outputStreamClass.getConstructor(OutputStream.class)
.newInstance(buffer);
return new DataOutputStream(stream);
@@ -244,8 +243,8 @@ public class Compressor {
// dynamically load the snappy class to avoid runtime dependency
// on snappy if we are not using it
try {
- Class SnappyInputStream = Class.forName("org.xerial.snappy.SnappyInputStream");
- InputStream stream = (InputStream) SnappyInputStream.getConstructor(InputStream.class)
+ Class<?> inputStreamClass = Class.forName("org.xerial.snappy.SnappyInputStream");
+ InputStream stream = (InputStream) inputStreamClass.getConstructor(InputStream.class)
.newInstance(buffer);
return new DataInputStream(stream);
} catch (Exception e) {
@@ -254,7 +253,7 @@ public class Compressor {
case LZ4:
// dynamically load LZ4 class to avoid runtime dependency
try {
- Class inputStreamClass = Class.forName("org.apache.kafka.common.message.KafkaLZ4BlockInputStream");
+ Class<?> inputStreamClass = Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream");
InputStream stream = (InputStream) inputStreamClass.getConstructor(InputStream.class)
.newInstance(buffer);
return new DataInputStream(stream);
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
new file mode 100644
index 0000000..f480da2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.record;
+
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_MAX_HEADER_LENGTH;
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.MAGIC;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.BD;
+import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.FLG;
+import org.apache.kafka.common.utils.Utils;
+
+import net.jpountz.lz4.LZ4Exception;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4SafeDecompressor;
+import net.jpountz.xxhash.XXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+
+/**
+ * A partial implementation of the v1.4.1 LZ4 Frame format.
+ *
+ * @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4 Framing
+ * Format Spec</a>
+ */
+public final class KafkaLZ4BlockInputStream extends FilterInputStream {
+
+ public static final String PREMATURE_EOS = "Stream ended prematurely";
+ public static final String NOT_SUPPORTED = "Stream unsupported";
+ public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch";
+ public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted";
+
+ private final LZ4SafeDecompressor decompressor;
+ private final XXHash32 checksum;
+ private final byte[] buffer;
+ private final byte[] compressedBuffer;
+ private final int maxBlockSize;
+ private FLG flg;
+ private BD bd;
+ private int bufferOffset;
+ private int bufferSize;
+ private boolean finished;
+
+ /**
+ * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
+ *
+ * @param in The stream to decompress
+ * @throws IOException
+ */
+ public KafkaLZ4BlockInputStream(InputStream in) throws IOException {
+ super(in);
+ decompressor = LZ4Factory.fastestInstance().safeDecompressor();
+ checksum = XXHashFactory.fastestInstance().hash32();
+ readHeader();
+ maxBlockSize = bd.getBlockMaximumSize();
+ buffer = new byte[maxBlockSize];
+ compressedBuffer = new byte[maxBlockSize];
+ bufferOffset = 0;
+ bufferSize = 0;
+ finished = false;
+ }
+
+ /**
+ * Reads the magic number and frame descriptor from the underlying {@link InputStream}.
+ *
+ * @throws IOException
+ */
+ private void readHeader() throws IOException {
+ byte[] header = new byte[LZ4_MAX_HEADER_LENGTH];
+
+ // read first 6 bytes into buffer to check magic and FLG/BD descriptor flags
+ bufferOffset = 6;
+ if (in.read(header, 0, bufferOffset) != bufferOffset) {
+ throw new IOException(PREMATURE_EOS);
+ }
+
+ if (MAGIC != Utils.readUnsignedIntLE(header, bufferOffset - 6)) {
+ throw new IOException(NOT_SUPPORTED);
+ }
+ flg = FLG.fromByte(header[bufferOffset - 2]);
+ bd = BD.fromByte(header[bufferOffset - 1]);
+ // TODO read uncompressed content size, update flg.validate()
+ // TODO read dictionary id, update flg.validate()
+
+ // check stream descriptor hash
+ byte hash = (byte) ((checksum.hash(header, 0, bufferOffset, 0) >> 8) & 0xFF);
+ header[bufferOffset++] = (byte) in.read();
+ if (hash != header[bufferOffset - 1]) {
+ throw new IOException(DESCRIPTOR_HASH_MISMATCH);
+ }
+ }
+
+ /**
+ * Decompresses (if necessary) buffered data, optionally computes and validates a XXHash32 checksum, and writes the
+ * result to a buffer.
+ *
+ * @throws IOException
+ */
+ private void readBlock() throws IOException {
+ int blockSize = Utils.readUnsignedIntLE(in);
+
+ // Check for EndMark
+ if (blockSize == 0) {
+ finished = true;
+ // TODO implement content checksum, update flg.validate()
+ return;
+ } else if (blockSize > maxBlockSize) {
+ throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize));
+ }
+
+ boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0;
+ byte[] bufferToRead;
+ if (compressed) {
+ bufferToRead = compressedBuffer;
+ } else {
+ blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK;
+ bufferToRead = buffer;
+ bufferSize = blockSize;
+ }
+
+ if (in.read(bufferToRead, 0, blockSize) != blockSize) {
+ throw new IOException(PREMATURE_EOS);
+ }
+
+ // verify checksum
+ if (flg.isBlockChecksumSet() && Utils.readUnsignedIntLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) {
+ throw new IOException(BLOCK_HASH_MISMATCH);
+ }
+
+ if (compressed) {
+ try {
+ bufferSize = decompressor.decompress(compressedBuffer, 0, blockSize, buffer, 0, maxBlockSize);
+ } catch (LZ4Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ bufferOffset = 0;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (finished) {
+ return -1;
+ }
+ if (available() == 0) {
+ readBlock();
+ }
+ if (finished) {
+ return -1;
+ }
+ int value = buffer[bufferOffset++] & 0xFF;
+
+ return value;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ net.jpountz.util.Utils.checkRange(b, off, len);
+ if (finished) {
+ return -1;
+ }
+ if (available() == 0) {
+ readBlock();
+ }
+ if (finished) {
+ return -1;
+ }
+ len = Math.min(len, available());
+ System.arraycopy(buffer, bufferOffset, b, off, len);
+ bufferOffset += len;
+ return len;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (finished) {
+ return 0;
+ }
+ if (available() == 0) {
+ readBlock();
+ }
+ if (finished) {
+ return 0;
+ }
+ n = Math.min(n, available());
+ bufferOffset += n;
+ return n;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return bufferSize - bufferOffset;
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ throw new RuntimeException("mark not supported");
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ throw new RuntimeException("reset not supported");
+ }
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
new file mode 100644
index 0000000..6a2231f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
@@ -0,0 +1,392 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.record;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.kafka.common.utils.Utils;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.xxhash.XXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+
+/**
+ * A partial implementation of the v1.4.1 LZ4 Frame format.
+ *
+ * @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4 Framing
+ * Format Spec</a>
+ */
+public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
+
+ public static final int MAGIC = 0x184D2204;
+ public static final int LZ4_MAX_HEADER_LENGTH = 19;
+ public static final int LZ4_FRAME_INCOMPRESSIBLE_MASK = 0x80000000;
+
+ public static final String CLOSED_STREAM = "The stream is already closed";
+
+ public static final int BLOCKSIZE_64KB = 4;
+ public static final int BLOCKSIZE_256KB = 5;
+ public static final int BLOCKSIZE_1MB = 6;
+ public static final int BLOCKSIZE_4MB = 7;
+
+ private final LZ4Compressor compressor;
+ private final XXHash32 checksum;
+ private final FLG flg;
+ private final BD bd;
+ private final byte[] buffer;
+ private final byte[] compressedBuffer;
+ private final int maxBlockSize;
+ private int bufferOffset;
+ private boolean finished;
+
+ /**
+ * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+ *
+ * @param out The output stream to compress
+ * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other
+ * values will generate an exception
+ * @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and appended to the stream for
+ * every block of data
+ * @throws IOException
+ */
+ public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum) throws IOException {
+ super(out);
+ compressor = LZ4Factory.fastestInstance().fastCompressor();
+ checksum = XXHashFactory.fastestInstance().hash32();
+ bd = new BD(blockSize);
+ flg = new FLG(blockChecksum);
+ bufferOffset = 0;
+ maxBlockSize = bd.getBlockMaximumSize();
+ buffer = new byte[maxBlockSize];
+ compressedBuffer = new byte[compressor.maxCompressedLength(maxBlockSize)];
+ finished = false;
+ writeHeader();
+ }
+
+ /**
+ * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+ *
+ * @param out The stream to compress
+ * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other
+ * values will generate an exception
+ * @throws IOException
+ */
+ public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize) throws IOException {
+ this(out, blockSize, false);
+ }
+
+ /**
+ * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+ *
+ * @param out The output stream to compress
+ * @throws IOException
+ */
+ public KafkaLZ4BlockOutputStream(OutputStream out) throws IOException {
+ this(out, BLOCKSIZE_64KB);
+ }
+
+ /**
+ * Writes the magic number and frame descriptor to the underlying {@link OutputStream}.
+ *
+ * @throws IOException
+ */
+ private void writeHeader() throws IOException {
+ Utils.writeUnsignedIntLE(buffer, 0, MAGIC);
+ bufferOffset = 4;
+ buffer[bufferOffset++] = flg.toByte();
+ buffer[bufferOffset++] = bd.toByte();
+ // TODO write uncompressed content size, update flg.validate()
+ // TODO write dictionary id, update flg.validate()
+ // compute checksum on all descriptor fields
+ int hash = (checksum.hash(buffer, 0, bufferOffset, 0) >> 8) & 0xFF;
+ buffer[bufferOffset++] = (byte) hash;
+ // write out frame descriptor
+ out.write(buffer, 0, bufferOffset);
+ bufferOffset = 0;
+ }
+
+ /**
+ * Compresses buffered data, optionally computes an XXHash32 checksum, and writes the result to the underlying
+ * {@link OutputStream}.
+ *
+ * @throws IOException
+ */
+ private void writeBlock() throws IOException {
+ if (bufferOffset == 0) {
+ return;
+ }
+
+ int compressedLength = compressor.compress(buffer, 0, bufferOffset, compressedBuffer, 0);
+ byte[] bufferToWrite = compressedBuffer;
+ int compressMethod = 0;
+
+ // Store block uncompressed if compressed length is greater (incompressible)
+ if (compressedLength >= bufferOffset) {
+ bufferToWrite = buffer;
+ compressedLength = bufferOffset;
+ compressMethod = LZ4_FRAME_INCOMPRESSIBLE_MASK;
+ }
+
+ // Write content
+ Utils.writeUnsignedIntLE(out, compressedLength | compressMethod);
+ out.write(bufferToWrite, 0, compressedLength);
+
+ // Calculate and write block checksum
+ if (flg.isBlockChecksumSet()) {
+ int hash = checksum.hash(bufferToWrite, 0, compressedLength, 0);
+ Utils.writeUnsignedIntLE(out, hash);
+ }
+ bufferOffset = 0;
+ }
+
+ /**
+ * Similar to the {@link #writeBlock()} method. Writes a 0-length block (without block checksum) to signal the end
+ * of the block stream.
+ *
+ * @throws IOException
+ */
+ private void writeEndMark() throws IOException {
+ Utils.writeUnsignedIntLE(out, 0);
+ // TODO implement content checksum, update flg.validate()
+ finished = true;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ ensureNotFinished();
+ if (bufferOffset == maxBlockSize) {
+ writeBlock();
+ }
+ buffer[bufferOffset++] = (byte) b;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ net.jpountz.util.Utils.checkRange(b, off, len);
+ ensureNotFinished();
+
+ int bufferRemainingLength = maxBlockSize - bufferOffset;
+ // while b will fill the buffer
+ while (len > bufferRemainingLength) {
+ // fill remaining space in buffer
+ System.arraycopy(b, off, buffer, bufferOffset, bufferRemainingLength);
+ bufferOffset = maxBlockSize;
+ writeBlock();
+ // compute new offset and length
+ off += bufferRemainingLength;
+ len -= bufferRemainingLength;
+ bufferRemainingLength = maxBlockSize;
+ }
+
+ System.arraycopy(b, off, buffer, bufferOffset, len);
+ bufferOffset += len;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (!finished) {
+ writeBlock();
+ }
+ if (out != null) {
+ out.flush();
+ }
+ }
+
+ /**
+ * A simple state check to ensure the stream is still open.
+ */
+ private void ensureNotFinished() {
+ if (finished) {
+ throw new IllegalStateException(CLOSED_STREAM);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!finished) {
+ writeEndMark();
+ flush();
+ finished = true;
+ }
+ if (out != null) {
+ out.close();
+ out = null;
+ }
+ }
+
+ public static class FLG {
+
+ private static final int VERSION = 1;
+
+ private final int presetDictionary;
+ private final int reserved1;
+ private final int contentChecksum;
+ private final int contentSize;
+ private final int blockChecksum;
+ private final int blockIndependence;
+ private final int version;
+
+ public FLG() {
+ this(false);
+ }
+
+ public FLG(boolean blockChecksum) {
+ this(0, 0, 0, 0, blockChecksum ? 1 : 0, 1, VERSION);
+ }
+
+ private FLG(int presetDictionary,
+ int reserved1,
+ int contentChecksum,
+ int contentSize,
+ int blockChecksum,
+ int blockIndependence,
+ int version) {
+ this.presetDictionary = presetDictionary;
+ this.reserved1 = reserved1;
+ this.contentChecksum = contentChecksum;
+ this.contentSize = contentSize;
+ this.blockChecksum = blockChecksum;
+ this.blockIndependence = blockIndependence;
+ this.version = version;
+ validate();
+ }
+
+ public static FLG fromByte(byte flg) {
+ int presetDictionary = (flg >>> 0) & 1;
+ int reserved1 = (flg >>> 1) & 1;
+ int contentChecksum = (flg >>> 2) & 1;
+ int contentSize = (flg >>> 3) & 1;
+ int blockChecksum = (flg >>> 4) & 1;
+ int blockIndependence = (flg >>> 5) & 1;
+ int version = (flg >>> 6) & 3;
+
+ return new FLG(presetDictionary,
+ reserved1,
+ contentChecksum,
+ contentSize,
+ blockChecksum,
+ blockIndependence,
+ version);
+ }
+
+ public byte toByte() {
+ return (byte) (((presetDictionary & 1) << 0) | ((reserved1 & 1) << 1) | ((contentChecksum & 1) << 2)
+ | ((contentSize & 1) << 3) | ((blockChecksum & 1) << 4) | ((blockIndependence & 1) << 5) | ((version & 3) << 6));
+ }
+
+ private void validate() {
+ if (presetDictionary != 0) {
+ throw new RuntimeException("Preset dictionary is unsupported");
+ }
+ if (reserved1 != 0) {
+ throw new RuntimeException("Reserved1 field must be 0");
+ }
+ if (contentChecksum != 0) {
+ throw new RuntimeException("Content checksum is unsupported");
+ }
+ if (contentSize != 0) {
+ throw new RuntimeException("Content size is unsupported");
+ }
+ if (blockIndependence != 1) {
+ throw new RuntimeException("Dependent block stream is unsupported");
+ }
+ if (version != VERSION) {
+ throw new RuntimeException(String.format("Version %d is unsupported", version));
+ }
+ }
+
+ public boolean isPresetDictionarySet() {
+ return presetDictionary == 1;
+ }
+
+ public boolean isContentChecksumSet() {
+ return contentChecksum == 1;
+ }
+
+ public boolean isContentSizeSet() {
+ return contentSize == 1;
+ }
+
+ public boolean isBlockChecksumSet() {
+ return blockChecksum == 1;
+ }
+
+ public boolean isBlockIndependenceSet() {
+ return blockIndependence == 1;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+ }
+
+ public static class BD {
+
+ private final int reserved2;
+ private final int blockSizeValue;
+ private final int reserved3;
+
+ public BD() {
+ this(0, BLOCKSIZE_64KB, 0);
+ }
+
+ public BD(int blockSizeValue) {
+ this(0, blockSizeValue, 0);
+ }
+
+ private BD(int reserved2, int blockSizeValue, int reserved3) {
+ this.reserved2 = reserved2;
+ this.blockSizeValue = blockSizeValue;
+ this.reserved3 = reserved3;
+ validate();
+ }
+
+ public static BD fromByte(byte bd) {
+ int reserved2 = (bd >>> 0) & 15;
+ int blockMaximumSize = (bd >>> 4) & 7;
+ int reserved3 = (bd >>> 7) & 1;
+
+ return new BD(reserved2, blockMaximumSize, reserved3);
+ }
+
+ private void validate() {
+ if (reserved2 != 0) {
+ throw new RuntimeException("Reserved2 field must be 0");
+ }
+ if (blockSizeValue < 4 || blockSizeValue > 7) {
+ throw new RuntimeException("Block size value must be between 4 and 7");
+ }
+ if (reserved3 != 0) {
+ throw new RuntimeException("Reserved3 field must be 0");
+ }
+ }
+
+ // 2^(2n+8)
+ public int getBlockMaximumSize() {
+ return 1 << ((2 * blockSizeValue) + 8);
+ }
+
+ public byte toByte() {
+ return (byte) (((reserved2 & 15) << 0) | ((blockSizeValue & 7) << 4) | ((reserved3 & 1) << 7));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index cc4084f..083e7a3 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -164,21 +164,21 @@ public class MemoryRecords implements Records {
@Override
public String toString() {
- Iterator<LogEntry> iter = iterator();
- StringBuilder builder = new StringBuilder();
- builder.append('[');
- while(iter.hasNext()) {
- LogEntry entry = iter.next();
- builder.append('(');
- builder.append("offset=");
- builder.append(entry.offset());
- builder.append(",");
- builder.append("record=");
- builder.append(entry.record());
- builder.append(")");
- }
- builder.append(']');
- return builder.toString();
+ Iterator<LogEntry> iter = iterator();
+ StringBuilder builder = new StringBuilder();
+ builder.append('[');
+ while (iter.hasNext()) {
+ LogEntry entry = iter.next();
+ builder.append('(');
+ builder.append("offset=");
+ builder.append(entry.offset());
+ builder.append(",");
+ builder.append("record=");
+ builder.append(entry.record());
+ builder.append(")");
+ }
+ builder.append(']');
+ return builder.toString();
}
public static class RecordsIterator extends AbstractIterator<LogEntry> {
@@ -218,8 +218,8 @@ public class MemoryRecords implements Records {
if (type == CompressionType.NONE) {
rec = buffer.slice();
int newPos = buffer.position() + size;
- if(newPos > buffer.limit())
- return allDone();
+ if (newPos > buffer.limit())
+ return allDone();
buffer.position(newPos);
rec.limit(size);
} else {
@@ -251,7 +251,7 @@ public class MemoryRecords implements Records {
}
private boolean innerDone() {
- return (innerIter == null || !innerIter.hasNext());
+ return innerIter == null || !innerIter.hasNext();
}
}
}