You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2015/01/30 03:39:32 UTC
[2/7] kafka git commit: KAFKA-1760: New consumer.
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
deleted file mode 100644
index 29ad25e..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
-*/
-package org.apache.kafka.clients.consumer;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-
-import org.apache.kafka.common.TopicPartition;
-import org.junit.Test;
-
-/**
- * TODO: Clean this after the consumer implementation is complete. Until then, it is useful to write some sample test code using the new APIs
- *
- */
-public class ConsumerExampleTest {
- /**
- * This example demonstrates how to use the consumer to leverage Kafka's group management functionality for automatic consumer load
- * balancing and failure detection. This example assumes that the offsets are stored in Kafka and are automatically committed periodically,
- * as controlled by the auto.commit.interval.ms config
- */
-// @Test
-// public void testConsumerGroupManagementWithAutoOffsetCommits() {
-// Properties props = new Properties();
-// props.put("metadata.broker.list", "localhost:9092");
-// props.put("group.id", "test");
-// props.put("session.timeout.ms", "1000");
-// props.put("auto.commit.enable", "true");
-// props.put("auto.commit.interval.ms", "10000");
-// KafkaConsumer consumer = new KafkaConsumer(props);
-// // subscribe to some topics
-// consumer.subscribe("foo", "bar");
-// boolean isRunning = true;
-// while(isRunning) {
-// Map<String, ConsumerRecords> records = consumer.poll(100);
-// process(records);
-// }
-// consumer.close();
-// }
-
- /**
- * This example demonstrates how to use the consumer to leverage Kafka's group management functionality for automatic consumer load
- * balancing and failure detection. This example assumes that the offsets are stored in Kafka and are manually committed using the
- * commit() API. This example also demonstrates rewinding the consumer's offsets if processing of consumed messages fails.
- */
-// @Test
-// public void testConsumerGroupManagementWithManualOffsetCommit() {
-// Properties props = new Properties();
-// props.put("metadata.broker.list", "localhost:9092");
-// props.put("group.id", "test");
-// props.put("session.timeout.ms", "1000");
-// props.put("auto.commit.enable", "false");
-// KafkaConsumer consumer = new KafkaConsumer(props);
-// // subscribe to some topics
-// consumer.subscribe("foo", "bar");
-// int commitInterval = 100;
-// int numRecords = 0;
-// boolean isRunning = true;
-// Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
-// while(isRunning) {
-// Map<String, ConsumerRecords> records = consumer.poll(100);
-// try {
-// Map<TopicPartition, Long> lastConsumedOffsets = process(records);
-// consumedOffsets.putAll(lastConsumedOffsets);
-// numRecords += records.size();
-// // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
-// if(numRecords % commitInterval == 0)
-// consumer.commit(true);
-// } catch(Exception e) {
-// // rewind consumer's offsets for failed partitions
-// List<TopicPartition> failedPartitions = getFailedPartitions();
-// Map<TopicPartition, Long> offsetsToRewindTo = new HashMap<TopicPartition, Long>();
-// for(TopicPartition failedPartition : failedPartitions) {
-// // rewind to the last consumed offset for the failed partition. Since process() failed for this partition, the consumed offset
-// // should still be pointing to the last successfully processed offset and hence is the right offset to rewind consumption to.
-// offsetsToRewindTo.put(failedPartition, consumedOffsets.get(failedPartition));
-// }
-// // seek to new offsets only for partitions that failed the last process()
-// consumer.seek(offsetsToRewindTo);
-// }
-// }
-// consumer.close();
-// }
-
- private List<TopicPartition> getFailedPartitions() { return null; }
-
- /**
- * This example demonstrates the consumer can be used to leverage Kafka's group management functionality along with custom offset storage.
- * In this example, the assumption made is that the user chooses to store the consumer offsets outside Kafka. This requires the user to
- * plugin logic for retrieving the offsets from a custom store and provide the offsets to the consumer in the ConsumerRebalanceCallback
- * callback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance <i>and</i>
- * before the consumption restarts post rebalance. This is the right place to supply offsets from a custom store to the consumer.
- */
-// @Test
-// public void testConsumerRebalanceWithCustomOffsetStore() {
-// Properties props = new Properties();
-// props.put("metadata.broker.list", "localhost:9092");
-// props.put("group.id", "test");
-// props.put("session.timeout.ms", "1000");
-// props.put("auto.commit.enable", "true");
-// props.put("auto.commit.interval.ms", "10000");
-// KafkaConsumer consumer = new KafkaConsumer(props,
-// new ConsumerRebalanceCallback() {
-// public void onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions) {
-// Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions);
-// consumer.seek(lastCommittedOffsets);
-// }
-// public void onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions) {
-// Map<TopicPartition, Long> offsets = getLastConsumedOffsets(partitions); // implemented by the user
-// commitOffsetsToCustomStore(offsets); // implemented by the user
-// }
-// private Map<TopicPartition, Long> getLastCommittedOffsetsFromCustomStore(Collection<TopicPartition> partitions) {
-// return null;
-// }
-// private Map<TopicPartition, Long> getLastConsumedOffsets(Collection<TopicPartition> partitions) { return null; }
-// private void commitOffsetsToCustomStore(Map<TopicPartition, Long> offsets) {}
-// });
-// // subscribe to topics
-// consumer.subscribe("foo", "bar");
-// int commitInterval = 100;
-// int numRecords = 0;
-// boolean isRunning = true;
-// while(isRunning) {
-// Map<String, ConsumerRecords> records = consumer.poll(100);
-// Map<TopicPartition, Long> consumedOffsets = process(records);
-// numRecords += records.size();
-// // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
-// if(numRecords % commitInterval == 0)
-// commitOffsetsToCustomStore(consumedOffsets);
-// }
-// consumer.close();
-// }
-
- /**
- * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality along with Kafka based offset storage.
- * In this example, the assumption made is that the user chooses to use Kafka based offset management.
- */
-// @Test
-// public void testConsumerRewindWithGroupManagementAndKafkaOffsetStorage() {
-// Properties props = new Properties();
-// props.put("metadata.broker.list", "localhost:9092");
-// props.put("group.id", "test");
-// props.put("session.timeout.ms", "1000");
-// props.put("auto.commit.enable", "false");
-// KafkaConsumer consumer = new KafkaConsumer(props,
-// new ConsumerRebalanceCallback() {
-// boolean rewindOffsets = true;
-// public void onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions) {
-// if(rewindOffsets) {
-// Map<TopicPartition, Long> latestCommittedOffsets = consumer.committed(null);
-// Map<TopicPartition, Long> newOffsets = rewindOffsets(latestCommittedOffsets, 100);
-// consumer.seek(newOffsets);
-// }
-// }
-// public void onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions) {
-// consumer.commit(true);
-// }
-// // this API rewinds every partition back by numberOfMessagesToRewindBackTo messages
-// private Map<TopicPartition, Long> rewindOffsets(Map<TopicPartition, Long> currentOffsets,
-// long numberOfMessagesToRewindBackTo) {
-// Map<TopicPartition, Long> newOffsets = new HashMap<TopicPartition, Long>();
-// for(Map.Entry<TopicPartition, Long> offset : currentOffsets.entrySet()) {
-// newOffsets.put(offset.getKey(), offset.getValue() - numberOfMessagesToRewindBackTo);
-// }
-// return newOffsets;
-// }
-// });
-// // subscribe to topics
-// consumer.subscribe("foo", "bar");
-// int commitInterval = 100;
-// int numRecords = 0;
-// boolean isRunning = true;
-// while(isRunning) {
-// Map<String, ConsumerRecords> records = consumer.poll(100);
-// Map<TopicPartition, Long> consumedOffsets = process(records);
-// numRecords += records.size();
-// // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
-// if(numRecords % commitInterval == 0)
-// commitOffsetsToCustomStore(consumedOffsets);
-// }
-// consumer.close();
-// }
-
- /**
- * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest
- * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes
- * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions.
- * This example assumes that the user chooses to use Kafka based offset storage. The user still has to specify a group.id to use Kafka
- * based offset management. However, session.timeout.ms is not required since the Kafka consumer only does failure detection with group
- * management.
- */
-// @Test
-// public void testConsumerWithKafkaBasedOffsetManagement() {
-// Properties props = new Properties();
-// props.put("metadata.broker.list", "localhost:9092");
-// props.put("group.id", "test");
-// props.put("auto.commit.enable", "true");
-// props.put("auto.commit.interval.ms", "10000");
-// KafkaConsumer consumer = new KafkaConsumer(props);
-// // subscribe to some partitions of topic foo
-// TopicPartition partition0 = new TopicPartition("foo", 0);
-// TopicPartition partition1 = new TopicPartition("foo", 1);
-// TopicPartition[] partitions = new TopicPartition[2];
-// partitions[0] = partition0;
-// partitions[1] = partition1;
-// consumer.subscribe(partitions);
-// // find the last committed offsets for partitions 0,1 of topic foo
-// Map<TopicPartition, Long> lastCommittedOffsets = consumer.committed(null);
-// // seek to the last committed offsets to avoid duplicates
-// consumer.seek(lastCommittedOffsets);
-// // find the offsets of the latest available messages to know where to stop consumption
-// Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2, null);
-// boolean isRunning = true;
-// while(isRunning) {
-// Map<String, ConsumerRecords> records = consumer.poll(100);
-// Map<TopicPartition, Long> consumedOffsets = process(records);
-// for(TopicPartition partition : partitions) {
-// if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
-// isRunning = false;
-// else
-// isRunning = true;
-// }
-// }
-// consumer.close();
-// }
-
- /**
- * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest
- * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes
- * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions.
- * This example assumes that the user chooses to use custom offset storage.
- */
- @Test
- public void testConsumerWithCustomOffsetManagement() {
-// Properties props = new Properties();
-// props.put("metadata.broker.list", "localhost:9092");
-// KafkaConsumer consumer = new KafkaConsumer(props);
-// // subscribe to some partitions of topic foo
-// TopicPartition partition0 = new TopicPartition("foo", 0);
-// TopicPartition partition1 = new TopicPartition("foo", 1);
-// TopicPartition[] partitions = new TopicPartition[2];
-// partitions[0] = partition0;
-// partitions[1] = partition1;
-// consumer.subscribe(partitions);
-// Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore();
-// // seek to the last committed offsets to avoid duplicates
-// consumer.seek(lastCommittedOffsets);
-// // find the offsets of the latest available messages to know where to stop consumption
-// Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2, null);
-// boolean isRunning = true;
-// while(isRunning) {
-// Map<String, ConsumerRecords> records = consumer.poll(100);
-// Map<TopicPartition, Long> consumedOffsets = process(records);
-// // commit offsets for partitions 0,1 for topic foo to custom store
-// commitOffsetsToCustomStore(consumedOffsets);
-// for(TopicPartition partition : partitions) {
-// if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
-// isRunning = false;
-// else
-// isRunning = true;
-// }
-// }
-// consumer.close();
- }
-
- private Map<TopicPartition, Long> getLastCommittedOffsetsFromCustomStore() { return null; }
- private void commitOffsetsToCustomStore(Map<TopicPartition, Long> consumedOffsets) {}
- private Map<TopicPartition, Long> process(Map<String, ConsumerRecords> records) {
- Map<TopicPartition, Long> processedOffsets = new HashMap<TopicPartition, Long>();
- for(Entry<String, ConsumerRecords> recordMetadata : records.entrySet()) {
- List<ConsumerRecord> recordsPerTopic = recordMetadata.getValue().records();
- for(int i = 0;i < recordsPerTopic.size();i++) {
- ConsumerRecord record = recordsPerTopic.get(i);
- // process record
- try {
- processedOffsets.put(record.topicAndPartition(), record.offset());
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- return processedOffsets;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
new file mode 100644
index 0000000..e51d2df
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -0,0 +1,32 @@
+package org.apache.kafka.clients.consumer;
+
+import static org.junit.Assert.*;
+
+import java.util.Iterator;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+public class MockConsumerTest {
+
+ private MockConsumer<String, String> consumer = new MockConsumer<String, String>();
+
+ @Test
+ public void testSimpleMock() {
+ consumer.subscribe("topic");
+ assertEquals(0, consumer.poll(1000).count());
+ ConsumerRecord<String, String> rec1 = new ConsumerRecord<String, String>("test", 0, 0, "key1", "value1");
+ ConsumerRecord<String, String> rec2 = new ConsumerRecord<String, String>("test", 0, 1, "key2", "value2");
+ consumer.addRecord(rec1);
+ consumer.addRecord(rec2);
+ ConsumerRecords<String, String> recs = consumer.poll(1);
+ Iterator<ConsumerRecord<String, String>> iter = recs.iterator();
+ assertEquals(rec1, iter.next());
+ assertEquals(rec2, iter.next());
+ assertFalse(iter.hasNext());
+ assertEquals(1L, consumer.position(new TopicPartition("test", 0)));
+ consumer.commit(CommitType.SYNC);
+ assertEquals(1L, consumer.committed(new TopicPartition("test", 0)));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
new file mode 100644
index 0000000..864f1c7
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -0,0 +1,61 @@
+package org.apache.kafka.clients.consumer.internals;
+
+import static org.junit.Assert.*;
+import static java.util.Arrays.asList;
+
+import java.util.Collections;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+public class SubscriptionStateTest {
+
+ private final SubscriptionState state = new SubscriptionState();
+ private final TopicPartition tp0 = new TopicPartition("test", 0);
+ private final TopicPartition tp1 = new TopicPartition("test", 1);
+
+ @Test
+ public void partitionSubscription() {
+ state.subscribe(tp0);
+ assertEquals(Collections.singleton(tp0), state.assignedPartitions());
+ state.committed(tp0, 1);
+ state.fetched(tp0, 1);
+ state.consumed(tp0, 1);
+ assertAllPositions(tp0, 1L);
+ state.unsubscribe(tp0);
+ assertTrue(state.assignedPartitions().isEmpty());
+ assertAllPositions(tp0, null);
+ }
+
+ public void topicSubscription() {
+ state.subscribe("test");
+ assertEquals(1, state.subscribedTopics().size());
+ assertTrue(state.assignedPartitions().isEmpty());
+ assertTrue(state.partitionsAutoAssigned());
+ state.changePartitionAssignment(asList(tp0));
+ state.committed(tp0, 1);
+ state.fetched(tp0, 1);
+ state.consumed(tp0, 1);
+ assertAllPositions(tp0, 1L);
+ state.changePartitionAssignment(asList(tp1));
+ assertAllPositions(tp0, null);
+ assertEquals(Collections.singleton(tp1), state.assignedPartitions());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void cantChangeFetchPositionForNonAssignedPartition() {
+ state.fetched(tp0, 1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void cantChangeConsumedPositionForNonAssignedPartition() {
+ state.consumed(tp0, 1);
+ }
+
+ public void assertAllPositions(TopicPartition tp, Long offset) {
+ assertEquals(offset, state.committed(tp));
+ assertEquals(offset, state.fetched(tp));
+ assertEquals(offset, state.consumed(tp));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
index 1236803..77b23e7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
@@ -43,7 +43,7 @@ public class BufferPoolTest {
*/
@Test
public void testSimple() throws Exception {
- int totalMemory = 64 * 1024;
+ long totalMemory = 64 * 1024;
int size = 1024;
BufferPool pool = new BufferPool(totalMemory, size, false, metrics, time, metricGroup, metricTags);
ByteBuffer buffer = pool.allocate(size);
@@ -100,7 +100,7 @@ public class BufferPoolTest {
ByteBuffer buffer = pool.allocate(1024);
CountDownLatch doDealloc = asyncDeallocate(pool, buffer);
CountDownLatch allocation = asyncAllocate(pool, 5 * 1024);
- assertEquals("Allocation shouldn't have happened yet, waiting on memory.", 1, allocation.getCount());
+ assertEquals("Allocation shouldn't have happened yet, waiting on memory.", 1L, allocation.getCount());
doDealloc.countDown(); // return the memory
allocation.await();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 3676b05..d3377ef 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -38,7 +38,7 @@ public class MockProducerTest {
Future<RecordMetadata> metadata = producer.send(record);
assertTrue("Send should be immediately complete", metadata.isDone());
assertFalse("Send should be successful", isError(metadata));
- assertEquals("Offset should be 0", 0, metadata.get().offset());
+ assertEquals("Offset should be 0", 0L, metadata.get().offset());
assertEquals(topic, metadata.get().topic());
assertEquals("We should have the record in our history", asList(record), producer.history());
producer.clear();
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
index 1d077fd..82d8083 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
@@ -1,30 +1,23 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.producer;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import java.util.List;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.internals.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
@@ -34,7 +27,6 @@ import org.junit.Test;
public class PartitionerTest {
private byte[] key = "key".getBytes();
- private byte[] value = "value".getBytes();
private Partitioner partitioner = new Partitioner();
private Node node0 = new Node(0, "localhost", 99);
private Node node1 = new Node(1, "localhost", 100);
@@ -48,33 +40,28 @@ public class PartitionerTest {
@Test
public void testUserSuppliedPartitioning() {
- assertEquals("If the user supplies a partition we should use it.",
- 0,
- partitioner.partition(new ProducerRecord<byte[], byte[]>("test", 0, key, value), cluster));
+ assertEquals("If the user supplies a partition we should use it.", 0, partitioner.partition("test", key, 0, cluster));
}
@Test
public void testKeyPartitionIsStable() {
- int partition = partitioner.partition(new ProducerRecord<byte[], byte[]>("test", key, value), cluster);
- assertEquals("Same key should yield same partition",
- partition,
- partitioner.partition(new ProducerRecord<byte[], byte[]>("test", key, "value2".getBytes()), cluster));
+ int partition = partitioner.partition("test", key, null, cluster);
+ assertEquals("Same key should yield same partition", partition, partitioner.partition("test", key, null, cluster));
}
@Test
public void testRoundRobinIsStable() {
- int startPart = partitioner.partition(new ProducerRecord<byte[], byte[]>("test", value), cluster);
+ int startPart = partitioner.partition("test", null, null, cluster);
for (int i = 1; i <= 100; i++) {
- int partition = partitioner.partition(new ProducerRecord<byte[], byte[]>("test", value), cluster);
- assertEquals("Should yield a different partition each call with round-robin partitioner",
- partition, (startPart + i) % 2);
- }
+ int partition = partitioner.partition("test", null, null, cluster);
+ assertEquals("Should yield a different partition each call with round-robin partitioner", partition, (startPart + i) % 2);
+ }
}
@Test
public void testRoundRobinWithDownNode() {
for (int i = 0; i < partitions.size(); i++) {
- int part = partitioner.partition(new ProducerRecord<byte[], byte[]>("test", value), cluster);
+ int part = partitioner.partition("test", null, null, cluster);
assertTrue("We should never choose a leader-less node in round robin", part >= 0 && part < 2);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
index 66cbdf5..888b929 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
@@ -72,14 +72,14 @@ public class SenderTest {
@Test
public void testSimple() throws Exception {
- int offset = 0;
+ long offset = 0;
Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future;
sender.run(time.milliseconds()); // connect
sender.run(time.milliseconds()); // send produce request
assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
client.respond(produceResponse(tp.topic(), tp.partition(), offset, Errors.NONE.code()));
sender.run(time.milliseconds());
- assertEquals("All requests completed.", offset, client.inFlightRequestCount());
+ assertEquals("All requests completed.", offset, (long) client.inFlightRequestCount());
sender.run(time.milliseconds());
assertTrue("Request should be completed", future.isDone());
assertEquals(offset, future.get().offset());
@@ -110,7 +110,7 @@ public class SenderTest {
sender.run(time.milliseconds()); // reconnect
sender.run(time.milliseconds()); // resend
assertEquals(1, client.inFlightRequestCount());
- int offset = 0;
+ long offset = 0;
client.respond(produceResponse(tp.topic(), tp.partition(), offset, Errors.NONE.code()));
sender.run(time.milliseconds());
assertTrue("Request should have retried and completed", future.isDone());
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 3c442a2..16d3fed 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -110,18 +110,18 @@ public class ConfigDefTest {
@Test(expected = ConfigException.class)
public void testInvalidDefaultRange() {
- ConfigDef def = new ConfigDef().define("name", Type.INT, -1, Range.between(0,10), Importance.HIGH, "docs");
+ new ConfigDef().define("name", Type.INT, -1, Range.between(0,10), Importance.HIGH, "docs");
}
@Test(expected = ConfigException.class)
public void testInvalidDefaultString() {
- ConfigDef def = new ConfigDef().define("name", Type.STRING, "bad", ValidString.in(Arrays.asList("valid", "values")), Importance.HIGH, "docs");
+ new ConfigDef().define("name", Type.STRING, "bad", ValidString.in("valid", "values"), Importance.HIGH, "docs");
}
@Test
public void testValidators() {
testValidators(Type.INT, Range.between(0,10), 5, new Object[]{1, 5, 9}, new Object[]{-1, 11});
- testValidators(Type.STRING, ValidString.in(Arrays.asList("good", "values", "default")), "default",
+ testValidators(Type.STRING, ValidString.in("good", "values", "default"), "default",
new Object[]{"good", "values", "default"}, new Object[]{"bad", "inputs"});
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 74c1957..a14659a 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -12,7 +12,6 @@
*/
package org.apache.kafka.common.network;
-import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -74,7 +73,7 @@ public class SelectorTest {
// disconnect
this.server.closeConnections();
while (!selector.disconnected().contains(node))
- selector.poll(1000L, EMPTY);
+ selector.poll(1000L);
// reconnect and do another request
blockingConnect(node);
@@ -89,7 +88,8 @@ public class SelectorTest {
int node = 0;
blockingConnect(node);
selector.disconnect(node);
- selector.poll(10, asList(createSend(node, "hello1")));
+ selector.send(createSend(node, "hello1"));
+ selector.poll(10);
assertEquals("Request should not have succeeded", 0, selector.completedSends().size());
assertEquals("There should be a disconnect", 1, selector.disconnected().size());
assertTrue("The disconnect should be from our node", selector.disconnected().contains(node));
@@ -104,7 +104,9 @@ public class SelectorTest {
public void testCantSendWithInProgress() throws Exception {
int node = 0;
blockingConnect(node);
- selector.poll(1000L, asList(createSend(node, "test1"), createSend(node, "test2")));
+ selector.send(createSend(node, "test1"));
+ selector.send(createSend(node, "test2"));
+ selector.poll(1000L);
}
/**
@@ -112,7 +114,8 @@ public class SelectorTest {
*/
@Test(expected = IllegalStateException.class)
public void testCantSendWithoutConnecting() throws Exception {
- selector.poll(1000L, asList(createSend(0, "test")));
+ selector.send(createSend(0, "test"));
+ selector.poll(1000L);
}
/**
@@ -131,7 +134,7 @@ public class SelectorTest {
int node = 0;
selector.connect(node, new InetSocketAddress("localhost", TestUtils.choosePort()), BUFFER_SIZE, BUFFER_SIZE);
while (selector.disconnected().contains(node))
- selector.poll(1000L, EMPTY);
+ selector.poll(1000L);
}
/**
@@ -152,14 +155,13 @@ public class SelectorTest {
int[] requests = new int[conns];
int[] responses = new int[conns];
int responseCount = 0;
- List<NetworkSend> sends = new ArrayList<NetworkSend>();
for (int i = 0; i < conns; i++)
- sends.add(createSend(i, i + "-" + 0));
+ selector.send(createSend(i, i + "-" + 0));
// loop until we complete all requests
while (responseCount < conns * reqs) {
// do the i/o
- selector.poll(0L, sends);
+ selector.poll(0L);
assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size());
@@ -175,12 +177,11 @@ public class SelectorTest {
}
// prepare new sends for the next round
- sends.clear();
for (NetworkSend send : selector.completedSends()) {
int dest = send.destination();
requests[dest]++;
if (requests[dest] < reqs)
- sends.add(createSend(dest, dest + "-" + requests[dest]));
+ selector.send(createSend(dest, dest + "-" + requests[dest]));
}
}
}
@@ -212,10 +213,34 @@ public class SelectorTest {
blockingConnect(0);
}
+ @Test
+ public void testMute() throws Exception {
+ blockingConnect(0);
+ blockingConnect(1);
+
+ selector.send(createSend(0, "hello"));
+ selector.send(createSend(1, "hi"));
+
+ selector.mute(1);
+
+ while (selector.completedReceives().isEmpty())
+ selector.poll(5);
+ assertEquals("We should have only one response", 1, selector.completedReceives().size());
+ assertEquals("The response should not be from the muted node", 0, selector.completedReceives().get(0).source());
+
+ selector.unmute(1);
+ do {
+ selector.poll(5);
+ } while (selector.completedReceives().isEmpty());
+ assertEquals("We should have only one response", 1, selector.completedReceives().size());
+ assertEquals("The response should be from the previously muted node", 1, selector.completedReceives().get(0).source());
+ }
+
private String blockingRequest(int node, String s) throws IOException {
- selector.poll(1000L, asList(createSend(node, s)));
+ selector.send(createSend(node, s));
+ selector.poll(1000L);
while (true) {
- selector.poll(1000L, EMPTY);
+ selector.poll(1000L);
for (NetworkReceive receive : selector.completedReceives())
if (receive.source() == node)
return asString(receive);
@@ -226,7 +251,7 @@ public class SelectorTest {
private void blockingConnect(int node) throws IOException {
selector.connect(node, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
while (!selector.connected().contains(node))
- selector.poll(10000L, EMPTY);
+ selector.poll(10000L);
}
private NetworkSend createSend(int node, String s) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index a39fab5..4c2ea34 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -16,6 +16,9 @@
*/
package org.apache.kafka.common.utils;
+import java.util.Arrays;
+import java.util.Collections;
+
import org.junit.Test;
import static org.apache.kafka.common.utils.Utils.getHost;
@@ -48,4 +51,11 @@ public class UtilsTest {
assertEquals("[::1]:1234", formatAddress("::1", 1234));
assertEquals("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678", formatAddress("2001:db8:85a3:8d3:1319:8a2e:370:7348", 5678));
}
+
+ @Test
+ public void testJoin() {
+ assertEquals("", Utils.join(Collections.emptyList(), ","));
+ assertEquals("1", Utils.join(Arrays.asList("1"), ","));
+ assertEquals("1,2,3", Utils.join(Arrays.asList(1, 2, 3), ","));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/test/MockSelector.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index d61de52..ea89b06 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -1,18 +1,14 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.test;
@@ -26,13 +22,13 @@ import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.utils.Time;
-
/**
* A fake selector to use for testing
*/
public class MockSelector implements Selectable {
private final Time time;
+ private final List<NetworkSend> initiatedSends = new ArrayList<NetworkSend>();
private final List<NetworkSend> completedSends = new ArrayList<NetworkSend>();
private final List<NetworkReceive> completedReceives = new ArrayList<NetworkReceive>();
private final List<Integer> disconnected = new ArrayList<Integer>();
@@ -68,8 +64,14 @@ public class MockSelector implements Selectable {
}
@Override
- public void poll(long timeout, List<NetworkSend> sends) throws IOException {
- this.completedSends.addAll(sends);
+ public void send(NetworkSend send) {
+ this.initiatedSends.add(send);
+ }
+
+ @Override
+ public void poll(long timeout) throws IOException {
+ this.completedSends.addAll(this.initiatedSends);
+ this.initiatedSends.clear();
time.sleep(timeout);
}
@@ -101,4 +103,20 @@ public class MockSelector implements Selectable {
return connected;
}
+ @Override
+ public void mute(int id) {
+ }
+
+ @Override
+ public void unmute(int id) {
+ }
+
+ @Override
+ public void muteAll() {
+ }
+
+ @Override
+ public void unmuteAll() {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
index 6d00ed0..a3b1b78 100644
--- a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
@@ -63,7 +63,7 @@ case class ConsumerMetadataRequest(group: String,
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
// return ConsumerCoordinatorNotAvailable for all uncaught errors
- val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode)
+ val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
index 84f6017..24aaf95 100644
--- a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
@@ -40,7 +40,7 @@ object ConsumerMetadataResponse {
}
-case class ConsumerMetadataResponse (coordinatorOpt: Option[Broker], errorCode: Short, correlationId: Int = 0)
+case class ConsumerMetadataResponse (coordinatorOpt: Option[Broker], errorCode: Short, correlationId: Int)
extends RequestOrResponse() {
def sizeInBytes =
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index b230e9a..e6ad8be 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -90,7 +90,7 @@ class Partition(val topic: String,
val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
val offsetMap = checkpoint.read
if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
- warn("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId))
+ info("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId))
val offset = offsetMap.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset)
val localReplica = new Replica(replicaId, this, time, offset, Some(log))
addReplicaIfNotExists(localReplica)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index fbef34c..14b22ab 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -141,7 +141,7 @@ class RequestSendThread(val controllerId: Int,
connectToBroker(toBroker, channel)
isSendSuccessful = false
// backoff before retrying the connection and send
- Utils.swallow(Thread.sleep(300))
+ Utils.swallowTrace(Thread.sleep(300))
}
}
if (receive != null) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 4631bc7..8b67aee 100644
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -179,12 +179,12 @@ object LogConfig {
.define(FileDeleteDelayMsProp, LONG, Defaults.FileDeleteDelayMs, atLeast(0), MEDIUM, FileDeleteDelayMsDoc)
.define(MinCleanableDirtyRatioProp, DOUBLE, Defaults.MinCleanableDirtyRatio, between(0, 1), MEDIUM,
MinCleanableRatioDoc)
- .define(CleanupPolicyProp, STRING, if (Defaults.Compact) Compact else Delete, in(asList(Compact, Delete)), MEDIUM,
+ .define(CleanupPolicyProp, STRING, if (Defaults.Compact) Compact else Delete, in(Compact, Delete), MEDIUM,
CompactDoc)
.define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable,
MEDIUM, UncleanLeaderElectionEnableDoc)
.define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), MEDIUM, MinInSyncReplicasDoc)
- .define(CompressionTypeProp, STRING, Defaults.CompressionType, in(seqAsJavaList(BrokerCompressionCodec.brokerCompressionOptions)), MEDIUM, CompressionTypeDoc)
+ .define(CompressionTypeProp, STRING, Defaults.CompressionType, in(BrokerCompressionCodec.brokerCompressionOptions:_*), MEDIUM, CompressionTypeDoc)
}
def configNames() = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ec8d9f7..48bc435 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,6 +17,12 @@
package kafka.server
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.JoinGroupResponse
+import org.apache.kafka.common.requests.HeartbeatResponse
+import org.apache.kafka.common.requests.ResponseHeader
+import org.apache.kafka.common.protocol.types.Struct
+
import kafka.api._
import kafka.common._
import kafka.log._
@@ -26,6 +32,9 @@ import kafka.network.RequestChannel.Response
import kafka.controller.KafkaController
import kafka.utils.{SystemTime, Logging}
+import java.nio.ByteBuffer
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic._
import scala.collection._
import org.I0Itec.zkclient.ZkClient
@@ -43,6 +52,7 @@ class KafkaApis(val requestChannel: RequestChannel,
this.logIdent = "[KafkaApi-%d] ".format(brokerId)
val metadataCache = new MetadataCache
+ private var consumerGroupGenerationId = 0
/**
* Top-level method that handles all requests and multiplexes to the right api
@@ -62,6 +72,8 @@ class KafkaApis(val requestChannel: RequestChannel,
case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
+ case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request)
+ case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
@@ -442,6 +454,23 @@ class KafkaApis(val requestChannel: RequestChannel,
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}
+ def handleJoinGroupRequest(request: RequestChannel.Request) {
+ val joinGroupReq = request.requestObj.asInstanceOf[JoinGroupRequestAndHeader]
+ val topics = JavaConversions.asScalaIterable(joinGroupReq.body.topics()).toSet
+ val partitions = this.replicaManager.logManager.allLogs.filter(log => topics.contains(log.topicAndPartition.topic))
+ val partitionList = partitions.map(_.topicAndPartition).map(tp => new org.apache.kafka.common.TopicPartition(tp.topic, tp.partition)).toBuffer
+ this.consumerGroupGenerationId += 1
+ val response = new JoinGroupResponse(ErrorMapping.NoError, this.consumerGroupGenerationId, joinGroupReq.body.consumerId, JavaConversions.asJavaList(partitionList))
+ val send = new BoundedByteBufferSend(new JoinGroupResponseAndHeader(joinGroupReq.correlationId, response))
+ requestChannel.sendResponse(new RequestChannel.Response(request, send))
+ }
+
+ def handleHeartbeatRequest(request: RequestChannel.Request) {
+ val hbReq = request.requestObj.asInstanceOf[HeartbeatRequestAndHeader]
+ val send = new BoundedByteBufferSend(new HeartbeatResponseAndHeader(hbReq.correlationId, new HeartbeatResponse(Errors.NONE.code)))
+ requestChannel.sendResponse(new RequestChannel.Response(request, send))
+ }
+
def close() {
// TODO currently closing the API is an no-op since the API no longer maintain any modules
// maybe removing the closing call in the end when KafkaAPI becomes a pure stateless layer
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index e58fbb9..fb948b9 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -327,7 +327,7 @@ class ReplicaManager(val config: KafkaConfig,
BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages)
trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
- .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset))
+ .format(messages.sizeInBytes, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset))
(topicAndPartition, LogAppendResult(info))
} catch {
// NOTE: Failed produce requests metric is not incremented for known exceptions
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index e455cb9..910691e 100644
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -113,8 +113,6 @@ object ConsoleConsumer extends Logging {
KafkaMetricsReporter.startReporters(verifiableProps)
}
-
-
val consumerProps = if (options.has(consumerConfigOpt))
Utils.loadProps(options.valueOf(consumerConfigOpt))
else
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 093c800..c39c067 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -17,14 +17,21 @@
package kafka.tools
-import java.util.concurrent.CountDownLatch
+import scala.collection.JavaConversions._
import java.util.concurrent.atomic.AtomicLong
import java.nio.channels.ClosedByInterruptException
import org.apache.log4j.Logger
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.record.Record
+import org.apache.kafka.common.record.Records
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
import kafka.message.Message
import kafka.utils.{ZkUtils, CommandLineUtils}
import java.util.{ Random, Properties }
-import kafka.consumer._
+import kafka.consumer.Consumer
+import kafka.consumer.ConsumerConnector
+import kafka.consumer.KafkaStream
+import kafka.consumer.ConsumerTimeoutException
import java.text.SimpleDateFormat
/**
@@ -42,50 +49,98 @@ object ConsumerPerformance {
if (!config.hideHeader) {
if (!config.showDetailedStats)
- println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
+ println("start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
else
- println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
+ println("time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
}
- // clean up zookeeper state for this group id for every perf run
- ZkUtils.maybeDeletePath(config.consumerConfig.zkConnect, "/consumers/" + config.consumerConfig.groupId)
-
- val consumerConnector: ConsumerConnector = Consumer.create(config.consumerConfig)
-
- val topicMessageStreams = consumerConnector.createMessageStreams(Map(config.topic -> config.numThreads))
- var threadList = List[ConsumerPerfThread]()
- for ((topic, streamList) <- topicMessageStreams)
- for (i <- 0 until streamList.length)
- threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config,
- totalMessagesRead, totalBytesRead)
-
- logger.info("Sleeping for 1 second.")
- Thread.sleep(1000)
- logger.info("starting threads")
- val startMs = System.currentTimeMillis
- for (thread <- threadList)
- thread.start
-
- for (thread <- threadList)
- thread.join
-
- val endMs = System.currentTimeMillis
- val elapsedSecs = (endMs - startMs - config.consumerConfig.consumerTimeoutMs) / 1000.0
+ var startMs, endMs = 0L
+ if(config.useNewConsumer) {
+ val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props)
+ consumer.subscribe(config.topic)
+ startMs = System.currentTimeMillis
+ consume(consumer, config.numMessages, 1000, config, totalMessagesRead, totalBytesRead)
+ endMs = System.currentTimeMillis
+ } else {
+ import kafka.consumer.ConsumerConfig
+ val consumerConfig = new ConsumerConfig(config.props)
+ val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
+ val topicMessageStreams = consumerConnector.createMessageStreams(Map(config.topic -> config.numThreads))
+ var threadList = List[ConsumerPerfThread]()
+ for ((topic, streamList) <- topicMessageStreams)
+ for (i <- 0 until streamList.length)
+ threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config, totalMessagesRead, totalBytesRead)
+
+ logger.info("Sleeping for 1 second.")
+ Thread.sleep(1000)
+ logger.info("starting threads")
+ startMs = System.currentTimeMillis
+ for (thread <- threadList)
+ thread.start
+ for (thread <- threadList)
+ thread.join
+ endMs = System.currentTimeMillis - consumerConfig.consumerTimeoutMs
+ }
+ val elapsedSecs = (endMs - startMs) / 1000.0
if (!config.showDetailedStats) {
val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024)
- println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), config.dateFormat.format(endMs),
- config.consumerConfig.fetchMessageMaxBytes, totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get,
- totalMessagesRead.get / elapsedSecs))
+ println(("%s, %s, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), config.dateFormat.format(endMs),
+ totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get, totalMessagesRead.get / elapsedSecs))
}
System.exit(0)
}
+
+ def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], count: Long, timeout: Long, config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) {
+ var bytesRead = 0L
+ var messagesRead = 0L
+ val startMs = System.currentTimeMillis
+ var lastReportTime: Long = startMs
+ var lastBytesRead = 0L
+ var lastMessagesRead = 0L
+ var lastConsumed = System.currentTimeMillis
+ while(messagesRead < count && lastConsumed >= System.currentTimeMillis - timeout) {
+ val records = consumer.poll(100)
+ if(records.count() > 0)
+ lastConsumed = System.currentTimeMillis
+ for(record <- records) {
+ messagesRead += 1
+ if(record.key != null)
+ bytesRead += record.key.size
+ if(record.value != null)
+ bytesRead += record.value.size
+
+ if (messagesRead % config.reportingInterval == 0) {
+ if (config.showDetailedStats)
+ printProgressMessage(0, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, System.currentTimeMillis, config.dateFormat)
+ lastReportTime = System.currentTimeMillis
+ lastMessagesRead = messagesRead
+ lastBytesRead = bytesRead
+ }
+ }
+ }
+ totalMessagesRead.set(messagesRead)
+ totalBytesRead.set(bytesRead)
+ }
+
+ def printProgressMessage(id: Int, bytesRead: Long, lastBytesRead: Long, messagesRead: Long, lastMessagesRead: Long,
+ startMs: Long, endMs: Long, dateFormat: SimpleDateFormat) = {
+ val elapsedMs: Double = endMs - startMs
+ val totalMBRead = (bytesRead * 1.0) / (1024 * 1024)
+ val mbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
+ println(("%s, %d, %.4f, %.4f, %d, %.4f").format(dateFormat.format(endMs), id, totalMBRead,
+ 1000.0 * (mbRead / elapsedMs), messagesRead, ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0))
+ }
class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) {
- val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
- "Multiple URLS can be given to allow fail-over.")
+ val zkConnectOpt = parser.accepts("zookeeper", "The connection string for the zookeeper connection in the form host:port. " +
+ "Multiple URLS can be given to allow fail-over. This option is only used with the old consumer.")
.withRequiredArg
.describedAs("urls")
.ofType(classOf[String])
+ val bootstrapServersOpt = parser.accepts("broker-list", "A broker list to use for connecting if using the new consumer.")
+ .withRequiredArg()
+ .describedAs("host")
+ .ofType(classOf[String])
val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
.withRequiredArg
.describedAs("topic")
@@ -117,20 +172,35 @@ object ConsumerPerformance {
.describedAs("count")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
+ val useNewConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation.")
val options = parser.parse(args: _*)
- CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, zkConnectOpt)
-
+ CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
+
+ val useNewConsumer = options.has(useNewConsumerOpt)
+
val props = new Properties
- props.put("group.id", options.valueOf(groupIdOpt))
- props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)
- props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString)
- props.put("auto.offset.reset", if (options.has(resetBeginningOffsetOpt)) "largest" else "smallest")
- props.put("zookeeper.connect", options.valueOf(zkConnectOpt))
- props.put("consumer.timeout.ms", "5000")
- props.put("num.consumer.fetchers", options.valueOf(numFetchersOpt).toString)
- val consumerConfig = new ConsumerConfig(props)
+ if(useNewConsumer) {
+ import org.apache.kafka.clients.consumer.ConsumerConfig
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServersOpt))
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt))
+ props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, options.valueOf(socketBufferSizeOpt).toString)
+ props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, options.valueOf(fetchSizeOpt).toString)
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (options.has(resetBeginningOffsetOpt)) "latest" else "earliest")
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer])
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer])
+ props.put(ConsumerConfig.CHECK_CRCS_CONFIG, "false")
+ } else {
+ CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
+ props.put("group.id", options.valueOf(groupIdOpt))
+ props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)
+ props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString)
+ props.put("auto.offset.reset", if (options.has(resetBeginningOffsetOpt)) "largest" else "smallest")
+ props.put("zookeeper.connect", options.valueOf(zkConnectOpt))
+ props.put("consumer.timeout.ms", "1000")
+ props.put("num.consumer.fetchers", options.valueOf(numFetchersOpt).toString)
+ }
val numThreads = options.valueOf(numThreadsOpt).intValue
val topic = options.valueOf(topicOpt)
val numMessages = options.valueOf(numMessagesOpt).longValue
@@ -161,7 +231,7 @@ object ConsumerPerformance {
if (messagesRead % config.reportingInterval == 0) {
if (config.showDetailedStats)
- printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, System.currentTimeMillis)
+ printProgressMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, System.currentTimeMillis, config.dateFormat)
lastReportTime = System.currentTimeMillis
lastMessagesRead = messagesRead
lastBytesRead = bytesRead
@@ -176,18 +246,9 @@ object ConsumerPerformance {
totalMessagesRead.addAndGet(messagesRead)
totalBytesRead.addAndGet(bytesRead)
if (config.showDetailedStats)
- printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, System.currentTimeMillis)
+ printProgressMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, System.currentTimeMillis, config.dateFormat)
}
- private def printMessage(id: Int, bytesRead: Long, lastBytesRead: Long, messagesRead: Long, lastMessagesRead: Long,
- startMs: Long, endMs: Long) = {
- val elapsedMs = endMs - startMs
- val totalMBRead = (bytesRead * 1.0) / (1024 * 1024)
- val mbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
- println(("%s, %d, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(endMs), id,
- config.consumerConfig.fetchMessageMaxBytes, totalMBRead,
- 1000.0 * (mbRead / elapsedMs), messagesRead, ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0))
- }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
index 7602b8d..900f7df 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
@@ -79,7 +79,7 @@ object SimpleConsumerPerformance {
done = true
else
// we only did one fetch so we find the offset for the first (head) messageset
- offset += messageSet.validBytes
+ offset = messageSet.last.nextOffset
totalBytesRead += bytesRead
totalMessagesRead += messagesRead
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/main/scala/kafka/utils/KafkaScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index 9a16343..7ceadcc 100644
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -41,6 +41,11 @@ trait Scheduler {
def shutdown()
/**
+ * Check if the scheduler has been started
+ */
+ def isStarted: Boolean
+
+ /**
* Schedule a task
* @param name The name of this task
* @param delay The amount of time to wait before the first execution
@@ -63,13 +68,13 @@ trait Scheduler {
class KafkaScheduler(val threads: Int,
val threadNamePrefix: String = "kafka-scheduler-",
daemon: Boolean = true) extends Scheduler with Logging {
- @volatile private var executor: ScheduledThreadPoolExecutor = null
+ private var executor: ScheduledThreadPoolExecutor = null
private val schedulerThreadId = new AtomicInteger(0)
override def startup() {
debug("Initializing task scheduler.")
this synchronized {
- if(executor != null)
+ if(isStarted)
throw new IllegalStateException("This scheduler has already been started!")
executor = new ScheduledThreadPoolExecutor(threads)
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
@@ -83,34 +88,45 @@ class KafkaScheduler(val threads: Int,
override def shutdown() {
debug("Shutting down task scheduler.")
- ensureStarted
- executor.shutdown()
- executor.awaitTermination(1, TimeUnit.DAYS)
- this.executor = null
+ this synchronized {
+ if(isStarted) {
+ executor.shutdown()
+ executor.awaitTermination(1, TimeUnit.DAYS)
+ this.executor = null
+ }
+ }
}
def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) = {
debug("Scheduling task %s with initial delay %d ms and period %d ms."
.format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit)))
- ensureStarted
- val runnable = Utils.runnable {
- try {
- trace("Begining execution of scheduled task '%s'.".format(name))
- fun()
- } catch {
- case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'", t)
- } finally {
- trace("Completed execution of scheduled task '%s'.".format(name))
+ this synchronized {
+ ensureStarted
+ val runnable = Utils.runnable {
+ try {
+ trace("Begining execution of scheduled task '%s'.".format(name))
+ fun()
+ } catch {
+ case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'", t)
+ } finally {
+ trace("Completed execution of scheduled task '%s'.".format(name))
+ }
}
+ if(period >= 0)
+ executor.scheduleAtFixedRate(runnable, delay, period, unit)
+ else
+ executor.schedule(runnable, delay, unit)
+ }
+ }
+
+ def isStarted: Boolean = {
+ this synchronized {
+ executor != null
}
- if(period >= 0)
- executor.scheduleAtFixedRate(runnable, delay, period, unit)
- else
- executor.schedule(runnable, delay, unit)
}
private def ensureStarted = {
- if(executor == null)
+ if(!isStarted)
throw new IllegalStateException("Kafka scheduler has not been started")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
new file mode 100644
index 0000000..798f035
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package kafka.api
+
+import org.apache.kafka.common.KafkaException
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.consumer.CommitType
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException
+import scala.collection.mutable.Buffer
+import scala.collection.JavaConversions._
+import java.util.ArrayList
+import java.util.Arrays
+import org.junit.Assert._
+import kafka.utils.TestUtils
+import kafka.utils.Logging
+import kafka.server.OffsetManager
+
+/**
+ * Integration tests for the new consumer that cover basic usage as well as server failures
+ */
+class ConsumerTest extends IntegrationTestHarness with Logging {
+
+ val producerCount = 1
+ val consumerCount = 2
+ val serverCount = 3
+
+ val topic = "topic"
+ val part = 0
+ val tp = new TopicPartition(topic, part)
+
+ // configure the servers and clients
+ this.serverConfig.setProperty("controlled.shutdown.enable", "false") // speed up shutdown
+ this.serverConfig.setProperty("offsets.topic.replication.factor", "3") // don't want to lose offset
+ this.serverConfig.setProperty("offsets.topic.num.partitions", "1")
+ this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
+ this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
+ this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
+ this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+
+ override def setUp() {
+ super.setUp()
+ // this will trigger the creation of the consumer offsets topic
+ this.consumers(0).partitionsFor(OffsetManager.OffsetsTopicName)
+ }
+
+ def testSimpleConsumption() {
+ val numRecords = 10000
+ sendRecords(numRecords)
+
+ assertEquals(0, this.consumers(0).subscriptions.size)
+ this.consumers(0).subscribe(tp)
+ assertEquals(1, this.consumers(0).subscriptions.size)
+
+ this.consumers(0).seek(tp, 0)
+ consumeRecords(this.consumers(0), numRecords = numRecords, startingOffset = 0)
+ }
+
+ def testAutoOffsetReset() {
+ sendRecords(1)
+ this.consumers(0).subscribe(tp)
+ consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
+ }
+
+ def testSeek() {
+ val consumer = this.consumers(0)
+ val totalRecords = 50L
+ sendRecords(totalRecords.toInt)
+ consumer.subscribe(tp)
+
+ consumer.seekToEnd(tp)
+ assertEquals(totalRecords, consumer.position(tp))
+ assertFalse(consumer.poll(totalRecords).iterator().hasNext())
+
+ consumer.seekToBeginning(tp)
+ assertEquals(0, consumer.position(tp), 0)
+ consumeRecords(consumer, numRecords = 1, startingOffset = 0)
+
+ val mid = totalRecords / 2
+ consumer.seek(tp, mid)
+ assertEquals(mid, consumer.position(tp))
+ consumeRecords(consumer, numRecords = 1, startingOffset = mid.toInt)
+ }
+
+ def testGroupConsumption() {
+ // we need to do this test with only one server since we have the hack join group
+ // that just assigns the partition hosted on the local machine (with two we might get the wrong machine
+ this.servers.last.shutdown()
+ this.servers.head.shutdown()
+ sendRecords(10)
+ this.consumers(0).subscribe(topic)
+ consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
+ }
+
+ def testPositionAndCommit() {
+ sendRecords(5)
+
+ // committed() on a partition with no committed offset throws an exception
+ intercept[NoOffsetForPartitionException] {
+ this.consumers(0).committed(new TopicPartition(topic, 15))
+ }
+
+ // position() on a partition that we aren't subscribed to throws an exception
+ intercept[IllegalArgumentException] {
+ this.consumers(0).position(new TopicPartition(topic, 15))
+ }
+
+ this.consumers(0).subscribe(tp)
+
+ assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers(0).position(tp))
+ this.consumers(0).commit(CommitType.SYNC)
+ assertEquals(0L, this.consumers(0).committed(tp))
+
+ consumeRecords(this.consumers(0), 5, 0)
+ assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers(0).position(tp))
+ this.consumers(0).commit(CommitType.SYNC)
+ assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp));
+
+ sendRecords(1)
+
+ // another consumer in the same group should get the same position
+ this.consumers(1).subscribe(tp)
+ consumeRecords(this.consumers(1), 1, 5)
+ }
+
+ def testPartitionsFor() {
+ val numParts = 2;
+ TestUtils.createTopic(this.zkClient, topic, numParts, 1, this.servers)
+ val parts = this.consumers(0).partitionsFor(topic)
+ assertNotNull(parts)
+ assertEquals(2, parts.length)
+ assertNull(this.consumers(0).partitionsFor("non-existant-topic"))
+ }
+
+ def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(numRecords = 1000)
+
+ /*
+ * 1. Produce a bunch of messages
+ * 2. Then consume the messages while killing and restarting brokers at random
+ */
+ def consumeWithBrokerFailures(numRecords: Int) {
+ TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers)
+ sendRecords(numRecords)
+ this.producers.map(_.close)
+ var consumed = 0
+ val consumer = this.consumers(0)
+ consumer.subscribe(topic)
+ while (consumed < numRecords) {
+ // check that we are getting the messages in order
+ for (record <- consumer.poll(200)) {
+ assertEquals(consumed.toLong, record.offset())
+ consumed += 1
+ }
+ consumer.commit(CommitType.SYNC);
+
+ /* restart any dead brokers, and kill a broker (with probability 1/3) */
+ restartDeadBrokers()
+ if (TestUtils.random.nextInt(3) == 0) {
+ info("Killing broker")
+ killRandomBroker()
+ }
+ }
+ }
+
+ def testSeekAndCommitWithBrokerFailures() = seekAndCommitWithBrokerFailures(20)
+
+ def seekAndCommitWithBrokerFailures(numIters: Int) {
+ // create a topic and send it some data
+ val numRecords = 1000
+ TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers)
+ sendRecords(numRecords)
+ this.producers.map(_.close)
+
+ val consumer = this.consumers(0)
+ consumer.subscribe(tp)
+ consumer.seek(tp, 0)
+ for (iter <- 0 until numIters) {
+ val coin = TestUtils.random.nextInt(4)
+ if (coin == 0) {
+ info("Seeking to end of log")
+ consumer.seekToEnd()
+ assertEquals(1000.toLong, consumer.position(tp))
+ } else if (coin == 1) {
+ val pos = TestUtils.random.nextInt(numRecords).toLong
+ info("Seeking to " + pos)
+ consumer.seek(tp, pos)
+ assertEquals(pos, consumer.position(tp))
+ } else if (coin == 2) {
+ info("Committing offset.")
+ consumer.commit(CommitType.SYNC)
+ assertEquals(consumer.position(tp), consumer.committed(tp))
+ } else {
+ restartDeadBrokers()
+ killRandomBroker()
+ }
+ }
+ }
+
+ def testPartitionReassignmentCallback() {
+ val callback = new TestConsumerReassignmentCallback()
+ this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "200"); // timeout quickly to avoid slow test
+ val consumer0 = new KafkaConsumer(this.consumerConfig, callback, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+ consumer0.subscribe("test")
+
+ // the initial subscription should cause a callback execution
+ while(callback.callsToAssigned == 0)
+ consumer0.poll(50)
+
+ // get metadata for the topic
+ var parts = consumer0.partitionsFor(OffsetManager.OffsetsTopicName)
+ while(parts == null)
+ parts = consumer0.partitionsFor(OffsetManager.OffsetsTopicName)
+ assertEquals(1, parts.size)
+ assertNotNull(parts(0).leader())
+
+ // shutdown the co-ordinator
+ val coordinator = parts(0).leader().id()
+ this.servers(coordinator).shutdown()
+
+ // this should cause another callback execution
+ while(callback.callsToAssigned < 2)
+ consumer0.poll(50)
+ assertEquals(2, callback.callsToAssigned)
+ assertEquals(2, callback.callsToRevoked)
+
+ consumer0.close()
+ }
+
+ class TestConsumerReassignmentCallback extends ConsumerRebalanceCallback {
+ var callsToAssigned = 0
+ var callsToRevoked = 0
+ def onPartitionsAssigned(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) {
+ info("onPartitionsAssigned called.")
+ callsToAssigned += 1
+ }
+ def onPartitionsRevoked(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) {
+ info("onPartitionsRevoked called.")
+ callsToRevoked += 1
+ }
+ }
+
+ private def sendRecords(numRecords: Int) {
+ val futures = (0 until numRecords).map { i =>
+ this.producers(0).send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))
+ }
+ futures.map(_.get)
+ }
+
+ private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int, startingOffset: Int) {
+ val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
+ val maxIters = numRecords * 300
+ var iters = 0
+ while (records.size < numRecords) {
+ for (record <- consumer.poll(50))
+ records.add(record)
+ if(iters > maxIters)
+ throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.");
+ iters += 1
+ }
+ for (i <- 0 until numRecords) {
+ val record = records.get(i)
+ val offset = startingOffset + i
+ assertEquals(topic, record.topic())
+ assertEquals(part, record.partition())
+ assertEquals(offset.toLong, record.offset())
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
new file mode 100644
index 0000000..5650b4a
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.scalatest.junit.JUnit3Suite
+import collection._
+import kafka.utils.TestUtils
+import java.util.Properties
+import java.util.Arrays
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.producer.KafkaProducer
+import kafka.server.KafkaConfig
+import kafka.integration.KafkaServerTestHarness
+import scala.collection.mutable.Buffer
+
+/**
+ * A helper class for writing integration tests that involve producers, consumers, and servers
+ */
+trait IntegrationTestHarness extends KafkaServerTestHarness {
+
+ val producerCount: Int
+ val consumerCount: Int
+ val serverCount: Int
+ lazy val producerConfig = new Properties
+ lazy val consumerConfig = new Properties
+ lazy val serverConfig = new Properties
+ override lazy val configs = {
+ val cfgs = TestUtils.createBrokerConfigs(serverCount)
+ cfgs.map(_.putAll(serverConfig))
+ cfgs.map(new KafkaConfig(_))
+ }
+
+ var consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+ var producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
+
+ override def setUp() {
+ super.setUp()
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl)
+ producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
+ producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
+ consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl)
+ consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
+ consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
+ for(i <- 0 until producerCount)
+ producers += new KafkaProducer(producerConfig)
+ for(i <- 0 until consumerCount)
+ consumers += new KafkaConsumer(consumerConfig)
+ }
+
+ override def tearDown() {
+ producers.map(_.close())
+ consumers.map(_.close())
+ super.tearDown()
+ }
+
+}