You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by mk...@apache.org on 2016/05/23 20:21:48 UTC
[1/3] crunch git commit: CRUNCH-606: Kafka Source for Crunch which
supports reading data as BytesWritable
Repository: crunch
Updated Branches:
refs/heads/master c09c4ee2d -> 360d72a4f
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
new file mode 100644
index 0000000..ce97ec1
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+
+import kafka.api.OffsetRequest;
+import org.apache.crunch.Pair;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.crunch.kafka.ClusterTest.writeData;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNot.not;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaRecordsIterableIT {
+
+ @Mock
+ private Consumer<String, String> mockedConsumer;
+
+ @Mock
+ private ConsumerRecords<String, String> records;
+
+ @Rule
+ public TestName testName = new TestName();
+
+ private String topic;
+ private Map<TopicPartition, Long> startOffsets;
+ private Map<TopicPartition, Long> stopOffsets;
+ private Map<TopicPartition, Pair<Long, Long>> offsets;
+ private Consumer<String, String> consumer;
+ private Properties props;
+ private Properties consumerProps;
+
+ @BeforeClass
+ public static void init() throws Exception {
+ ClusterTest.startTest();
+ }
+
+ @AfterClass
+ public static void cleanup() throws Exception {
+ ClusterTest.endTest();
+ }
+
+ @Before
+ public void setup() {
+ topic = testName.getMethodName();
+
+ props = ClusterTest.getConsumerProperties();
+
+ startOffsets = new HashMap<>();
+ stopOffsets = new HashMap<>();
+ offsets = new HashMap<>();
+ for (int i = 0; i < 4; i++) {
+ TopicPartition tp = new TopicPartition(topic, i);
+ startOffsets.put(tp, 0L);
+ stopOffsets.put(tp, 100L);
+
+ offsets.put(tp, Pair.of(0L, 100L));
+ }
+
+
+ consumerProps = new Properties();
+ consumerProps.putAll(props);
+ }
+
+ @After
+ public void shutdown() {
+ }
+
+
+ @Test(expected = IllegalArgumentException.class)
+ public void nullConsumer() {
+ new KafkaRecordsIterable(null, offsets, new Properties());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void nullOffsets() {
+ new KafkaRecordsIterable<>(consumer, null, new Properties());
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void emptyOffsets() {
+ consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
+ Iterable<Pair<String, String>> data = new KafkaRecordsIterable<String, String>(consumer,
+ Collections.<TopicPartition, Pair<Long, Long>>emptyMap(), new Properties());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void nullProperties() {
+ new KafkaRecordsIterable(consumer, offsets, null);
+ }
+
+ @Test
+ public void iterateOverValues() {
+ consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
+ int loops = 10;
+ int numPerLoop = 100;
+ int total = loops * numPerLoop;
+ List<String> keys = writeData(props, topic, "batch", loops, numPerLoop);
+
+ startOffsets = getStartOffsets(props, topic);
+ stopOffsets = getStopOffsets(props, topic);
+
+ Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+ for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+ offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
+ }
+
+
+ Iterable<Pair<String, String>> data = new KafkaRecordsIterable<String, String>(consumer, offsets, new Properties());
+
+ int count = 0;
+ for (Pair<String, String> event : data) {
+ assertThat(keys, hasItem(event.first()));
+ assertTrue(keys.remove(event.first()));
+ count++;
+ }
+
+ assertThat(count, is(total));
+ assertThat(keys.size(), is(0));
+ }
+
+ @Test
+ public void iterateOverOneValue() {
+ consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
+ int loops = 1;
+ int numPerLoop = 1;
+ int total = loops * numPerLoop;
+ List<String> keys = writeData(props, topic, "batch", loops, numPerLoop);
+
+ startOffsets = getStartOffsets(props, topic);
+ stopOffsets = getStopOffsets(props, topic);
+
+ Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+ for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+ offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
+ System.out.println(entry.getKey()+ "start:"+entry.getValue()+":end:"+stopOffsets.get(entry.getKey()));
+ }
+
+ Iterable<Pair<String, String>> data = new KafkaRecordsIterable<String, String>(consumer, offsets, new Properties());
+
+ int count = 0;
+ for (Pair<String, String> event : data) {
+ assertThat(keys, hasItem(event.first()));
+ assertTrue(keys.remove(event.first()));
+ count++;
+ }
+
+ assertThat(count, is(total));
+ assertThat(keys.size(), is(0));
+ }
+
+ @Test
+ public void iterateOverNothing() {
+ consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
+ int loops = 10;
+ int numPerLoop = 100;
+ writeData(props, topic, "batch", loops, numPerLoop);
+
+ //set the start offsets equal to the stop so won't iterate over anything
+ startOffsets = getStartOffsets(props, topic);
+ stopOffsets = getStartOffsets(props, topic);
+
+ Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+ for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+ offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
+ }
+
+ Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(consumer, offsets, new Properties());
+
+ int count = 0;
+ for (Pair<String, String> event : data) {
+ count++;
+ }
+
+ assertThat(count, is(0));
+ }
+
+ @Test
+ public void iterateOverPartial() {
+ consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
+ int loops = 10;
+ int numPerLoop = 100;
+ int numPerPartition = 50;
+
+ writeData(props, topic, "batch", loops, numPerLoop);
+
+ Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+ //set the start offsets equal to the stop so won't iterate over anything
+ startOffsets = getStartOffsets(props, topic);
+ for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+ offsets.put(entry.getKey(), Pair.of(entry.getValue(), entry.getValue() + numPerPartition));
+ }
+
+ Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(consumer, offsets, new Properties());
+
+ int count = 0;
+ for (Pair<String, String> event : data) {
+ count++;
+ }
+
+ assertThat(count, is(startOffsets.size() * numPerPartition));
+ }
+
+ @Test
+ public void dontIteratePastStop() {
+ consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
+ int loops = 10;
+ int numPerLoop = 100;
+
+ List<String> keys = writeData(props, topic, "batch1", loops, numPerLoop);
+
+ //set the start offsets equal to the stop so won't iterate over anything
+ startOffsets = getStartOffsets(props, topic);
+ stopOffsets = getStopOffsets(props, topic);
+
+ Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+ for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+ offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
+ }
+
+ List<String> secondKeys = writeData(props, topic, "batch2", loops, numPerLoop);
+
+ Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(consumer, offsets, new Properties());
+
+ int count = 0;
+ for (Pair<String, String> event : data) {
+ assertThat(keys, hasItem(event.first()));
+ assertTrue(keys.remove(event.first()));
+ assertThat(secondKeys, not(hasItem(event.first())));
+ count++;
+ }
+
+ assertThat(count, is(loops * numPerLoop));
+ assertThat(keys.size(), is(0));
+ }
+
+ @Test
+ public void iterateSkipInitialValues() {
+ consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
+ int loops = 10;
+ int numPerLoop = 100;
+
+ List<String> keys = writeData(props, topic, "batch1", loops, numPerLoop);
+
+ //set the start offsets equal to the stop so won't iterate over anything
+ startOffsets = getStopOffsets(props, topic);
+
+ List<String> secondKeys = writeData(props, topic, "batch2", loops, numPerLoop);
+
+ stopOffsets = getStopOffsets(props, topic);
+
+
+ Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+ for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+ offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
+ }
+
+
+ Iterable<Pair<String, String>> data = new KafkaRecordsIterable<String, String>(consumer, offsets,
+ new Properties());
+
+ int count = 0;
+ for (Pair<String, String> event : data) {
+ assertThat(secondKeys, hasItem(event.first()));
+ assertTrue(secondKeys.remove(event.first()));
+ assertThat(keys, not(hasItem(event.first())));
+ count++;
+ }
+
+ assertThat(count, is(loops * numPerLoop));
+ assertThat(secondKeys.size(), is(0));
+ }
+
+ @Test
+ public void iterateValuesWithExceptions() {
+ List<ConsumerRecord<String, String>> returnedRecords = new LinkedList<>();
+
+ for(int i = 0; i < 25; i++){
+ returnedRecords.add(new ConsumerRecord<String, String>(topic, 0, i, "key", null));
+ returnedRecords.add(new ConsumerRecord<String, String>(topic, 1, i, "key", null));
+ returnedRecords.add(new ConsumerRecord<String, String>(topic, 2, i, "key", null));
+ returnedRecords.add(new ConsumerRecord<String, String>(topic, 3, i, "key", null));
+ }
+
+ offsets = new HashMap<>();
+ offsets.put(new TopicPartition(topic, 0), Pair.of(0L, 25L));
+ offsets.put(new TopicPartition(topic, 1), Pair.of(0L, 25L));
+ offsets.put(new TopicPartition(topic, 2), Pair.of(0L, 25L));
+ offsets.put(new TopicPartition(topic, 3), Pair.of(0L, 25L));
+
+ when(records.isEmpty()).thenReturn(false);
+ when(records.iterator()).thenReturn(returnedRecords.iterator());
+ when(mockedConsumer.poll(Matchers.anyLong()))
+ //request for the first poll
+ .thenReturn(null)
+ //fail twice
+ .thenThrow(new TimeoutException("fail1"))
+ .thenThrow(new TimeoutException("fail2"))
+ //request that will give data
+ .thenReturn(records)
+ // shows to stop retrieving data
+ .thenReturn(null);
+
+ Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(mockedConsumer, offsets, new Properties());
+
+ int count = 0;
+ for (Pair<String, String> event : data) {
+ count++;
+ }
+
+ //should have gotten one value per topicpartition
+ assertThat(count, is(returnedRecords.size()));
+ }
+
+ @Test
+ public void iterateValuesAfterStopOffsets() {
+ List<ConsumerRecord<String, String>> returnedRecords = new LinkedList<>();
+ for (Map.Entry<TopicPartition, Long> entry : stopOffsets.entrySet()) {
+ returnedRecords.add(new ConsumerRecord<String, String>(entry.getKey().topic(),
+ entry.getKey().partition(), entry.getValue() + 1, "key", null));
+ }
+
+ when(records.isEmpty()).thenReturn(false);
+ when(records.iterator()).thenReturn(returnedRecords.iterator());
+ when(mockedConsumer.poll(Matchers.anyLong())).thenReturn(records).thenReturn(records).thenReturn(null);
+
+ Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(mockedConsumer, offsets, new Properties());
+
+ int count = 0;
+ for (Pair<String, String> event : data) {
+ count++;
+ }
+
+ assertThat(count, is(0));
+
+ }
+
+ @Test(expected = RetriableException.class)
+ public void iterateRetriableExceptionMaxExceeded() {
+ List<ConsumerRecord<String, String>> returnedRecords = new LinkedList<>();
+ for (Map.Entry<TopicPartition, Long> entry : stopOffsets.entrySet()) {
+ returnedRecords.add(new ConsumerRecord<String, String>(entry.getKey().topic(),
+ entry.getKey().partition(), entry.getValue() + 1, "key", null));
+ }
+
+ when(records.isEmpty()).thenReturn(false);
+ when(records.iterator()).thenReturn(returnedRecords.iterator());
+ when(mockedConsumer.poll(Matchers.anyLong()))
+ //for the fill poll call
+ .thenReturn(null)
+ //retry 5 times then fail
+ .thenThrow(new TimeoutException("fail1"))
+ .thenThrow(new TimeoutException("fail2"))
+ .thenThrow(new TimeoutException("fail3"))
+ .thenThrow(new TimeoutException("fail4"))
+ .thenThrow(new TimeoutException("fail5"))
+ .thenThrow(new TimeoutException("fail6"));
+
+ Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(mockedConsumer, offsets, new Properties());
+
+ data.iterator().next();
+ }
+
+ private static Map<TopicPartition, Long> getStopOffsets(Properties props, String topic) {
+ return KafkaUtils.getBrokerOffsets(props, OffsetRequest.LatestTime(), topic);
+ }
+
+ private static Map<TopicPartition, Long> getStartOffsets(Properties props, String topic) {
+ return KafkaUtils.getBrokerOffsets(props, OffsetRequest.EarliestTime(), topic);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java
new file mode 100644
index 0000000..3800c24
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+import kafka.api.OffsetRequest;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.crunch.kafka.KafkaUtils.getBrokerOffsets;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.matchers.JUnitMatchers.hasItem;
+
+public class KafkaSourceIT {
+
+ @Rule
+ public TemporaryPath path = new TemporaryPath();
+
+ @Rule
+ public TestName testName = new TestName();
+
+ private Properties consumerProps;
+ private Configuration config;
+ private String topic;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ ClusterTest.startTest();
+ }
+
+ @AfterClass
+ public static void cleanup() throws Exception {
+ ClusterTest.endTest();
+ }
+
+ @Before
+ public void setupTest() {
+ topic = testName.getMethodName();
+ consumerProps = ClusterTest.getConsumerProperties();
+ config = ClusterTest.getConsumerConfig();
+ }
+
+ @Test
+ public void sourceReadData() {
+ List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
+ Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
+ Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
+
+ Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+ for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+ Long endingOffset = endOffsets.get(entry.getKey());
+ offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
+ }
+
+ Configuration config = ClusterTest.getConf();
+
+ Pipeline pipeline = new MRPipeline(KafkaSourceIT.class, config);
+ pipeline.enableDebug();
+
+ TableSource<BytesWritable, BytesWritable> kafkaSource = new KafkaSource(consumerProps, offsets);
+
+ PTable<BytesWritable, BytesWritable> read = pipeline.read(kafkaSource);
+
+ Set<String> keysRead = new HashSet<>();
+ int numRecordsFound = 0;
+ for (Pair<BytesWritable, BytesWritable> values : read.materialize()) {
+ assertThat(keys, hasItem(new String(values.first().getBytes())));
+ numRecordsFound++;
+ keysRead.add(new String(values.first().getBytes()));
+ }
+
+ assertThat(numRecordsFound, is(keys.size()));
+ assertThat(keysRead.size(), is(keys.size()));
+
+ pipeline.done();
+ }
+
+
+ @Test
+ public void sourceReadDataThroughPipeline() {
+ List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
+ Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
+ Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
+
+ Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+ for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+ Long endingOffset = endOffsets.get(entry.getKey());
+ offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
+ }
+
+ Configuration config = ClusterTest.getConf();
+
+ Pipeline pipeline = new MRPipeline(KafkaSourceIT.class, config);
+ pipeline.enableDebug();
+
+ TableSource<BytesWritable, BytesWritable> kafkaSource = new KafkaSource(consumerProps, offsets);
+
+ PTable<BytesWritable, BytesWritable> read = pipeline.read(kafkaSource);
+ Path out = path.getPath("out");
+ read.parallelDo(new SimpleConvertFn(), Avros.strings()).write(To.textFile(out));
+
+ pipeline.run();
+
+ PCollection<String> persistedKeys = pipeline.read(From.textFile(out));
+
+ Set<String> keysRead = new HashSet<>();
+ int numRecordsFound = 0;
+ for (String value : persistedKeys.materialize()) {
+ assertThat(keys, hasItem(value));
+ numRecordsFound++;
+ keysRead.add(value);
+ }
+
+ assertThat(numRecordsFound, is(keys.size()));
+ assertThat(keysRead.size(), is(keys.size()));
+
+ pipeline.done();
+ }
+
+
+ private static class SimpleConvertFn extends MapFn<Pair<BytesWritable, BytesWritable>, String> {
+ @Override
+ public String map(Pair<BytesWritable, BytesWritable> input) {
+ return new String(input.first().getBytes());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java
new file mode 100644
index 0000000..38c3fce
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+import kafka.cluster.Broker;
+import org.apache.crunch.kafka.ClusterTest;
+import org.apache.crunch.kafka.KafkaUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+
+public class KafkaUtilsIT {
+
+ @Rule
+ public TestName testName = new TestName();
+
+ private String topic;
+ private static Broker broker;
+
+ @BeforeClass
+ public static void startup() throws Exception {
+ ClusterTest.startTest();
+
+ Properties props = ClusterTest.getConsumerProperties();
+ String brokerHostPorts = props.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
+
+ String brokerHostPortString = brokerHostPorts.split(",")[0];
+ String[] brokerHostPort = brokerHostPortString.split(":");
+
+ String brokerHost = brokerHostPort[0];
+ int brokerPort = Integer.parseInt(brokerHostPort[1]);
+
+ broker = new Broker(0, brokerHost, brokerPort, SecurityProtocol.PLAINTEXT);
+ }
+
+ @AfterClass
+ public static void shutdown() throws Exception {
+ ClusterTest.endTest();
+ }
+
+ @Before
+ public void setup() throws IOException {
+ topic = "topic-" + testName.getMethodName();
+ }
+
+ @Test
+ public void getKafkaProperties() {
+ Configuration config = new Configuration(false);
+ String propertyKey = "fake.kafka.property";
+ String propertyValue = testName.getMethodName();
+ config.set(propertyKey, propertyValue);
+
+ Properties props = KafkaUtils.getKafkaConnectionProperties(config);
+ assertThat(props.get(propertyKey), is((Object) propertyValue));
+ }
+
+ @Test
+ public void addKafkaProperties() {
+ String propertyKey = "fake.kafka.property";
+ String propertyValue = testName.getMethodName();
+
+ Properties props = new Properties();
+ props.setProperty(propertyKey, propertyValue);
+
+ Configuration config = new Configuration(false);
+
+ KafkaUtils.addKafkaConnectionProperties(props, config);
+ assertThat(config.get(propertyKey), is(propertyValue));
+ }
+
+
+ @Test(expected = IllegalArgumentException.class)
+ public void getBrokerOffsetsKafkaNullProperties() throws IOException {
+ KafkaUtils.getBrokerOffsets((Properties) null, kafka.api.OffsetRequest.LatestTime(), topic);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void getBrokerOffsetsKafkaNullTopics() throws IOException {
+ KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(), kafka.api.OffsetRequest.LatestTime(), (String[]) null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void getBrokerOffsetsKafkaEmptyTopics() throws IOException {
+ KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(), kafka.api.OffsetRequest.LatestTime());
+ }
+
+ @Test(timeout = 10000)
+ public void getLatestBrokerOffsetsKafka() throws IOException, InterruptedException {
+ ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 1, 4);
+ while (true) {
+ Map<TopicPartition, Long> offsets = KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(),
+ kafka.api.OffsetRequest.LatestTime(), topic);
+
+ assertNotNull(offsets);
+ assertThat(offsets.size(), is(4));
+ boolean allMatch = true;
+ for (int i = 0; i < 4; i++) {
+ TopicPartition tp = new TopicPartition(topic, i);
+ assertThat(offsets.keySet(), hasItem(tp));
+ allMatch &= (offsets.get(tp) == 1L);
+ }
+ if (allMatch) {
+ break;
+ }
+ Thread.sleep(100L);
+ }
+ }
+
+ @Test
+ public void getEarliestBrokerOffsetsKafka() throws IOException {
+ ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 1, 1);
+
+ Map<TopicPartition, Long> offsets = KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(),
+ kafka.api.OffsetRequest.EarliestTime(), topic);
+
+ assertNotNull(offsets);
+ //default create 4 topics
+ assertThat(offsets.size(), is(4));
+ for (int i = 0; i < 4; i++) {
+ assertThat(offsets.keySet(), hasItem(new TopicPartition(topic, i)));
+ assertThat(offsets.get(new TopicPartition(topic, i)), is(0L));
+ }
+ }
+
+ @Test
+ public void getBrokerOffsetsKafkaWithTimeBeforeTopicExists() throws IOException {
+ ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 1, 4);
+
+ // A time of 1L (1 ms after epoch) should be before the topic was created
+ Map<TopicPartition, Long> offsets = KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(), 1L, topic);
+
+ assertNotNull(offsets);
+ //default create 4 topics
+ assertThat(offsets.size(), is(4));
+ for (int i = 0; i < 4; i++) {
+ assertThat(offsets.keySet(), hasItem(new TopicPartition(topic, i)));
+ assertThat(offsets.get(new TopicPartition(topic, i)), is(0L));
+ }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void getBrokerOffsetsNoHostAvailable() throws IOException {
+ Properties testProperties = ClusterTest.getConsumerProperties();
+ testProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyBrokerHost1:0000,dummyBrokerHost2:0000");
+ testProperties.setProperty("metadata.broker.list", "dummyBrokerHost1:0000,dummyBrokerHost2:0000");
+ assertNotNull(KafkaUtils.getBrokerOffsets(testProperties, kafka.api.OffsetRequest.LatestTime(), topic));
+ }
+
+ @Test
+ public void getBrokerOffsetsSomeHostsUnavailable() throws IOException {
+ final Broker bad = new Broker(0, "dummyBrokerHost1", 0, SecurityProtocol.PLAINTEXT);
+ assertNotNull(KafkaUtils.getBrokerOffsets(Arrays.asList(broker, bad), kafka.api.OffsetRequest.LatestTime(), topic));
+ assertNotNull(KafkaUtils.getBrokerOffsets(Arrays.asList(bad, broker), kafka.api.OffsetRequest.LatestTime(), topic));
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java
new file mode 100644
index 0000000..d760a02
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.inputformat;
+
+
+import kafka.api.OffsetRequest;
+import org.apache.crunch.Pair;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.kafka.ClusterTest;
+import org.apache.crunch.kafka.KafkaSource;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.crunch.kafka.KafkaUtils.getBrokerOffsets;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.matchers.JUnitMatchers.hasItem;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaInputFormatIT {
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Mock
+ private TaskAttemptContext taskContext;
+
+ @Mock
+ private FormatBundle bundle;
+ private Properties consumerProps;
+ private Configuration config;
+ private String topic;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ ClusterTest.startTest();
+ }
+
+ @AfterClass
+ public static void cleanup() throws Exception {
+ ClusterTest.endTest();
+ }
+
+ @Before
+ public void setupTest() {
+ topic = testName.getMethodName();
+ consumerProps = ClusterTest.getConsumerProperties();
+
+ consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName());
+ consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName());
+
+ config = ClusterTest.getConsumerConfig();
+
+ config.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName());
+ config.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName());
+ }
+
+ @Test
+ public void getSplitsFromFormat() throws IOException, InterruptedException {
+ List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
+ Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
+ Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
+
+ Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+ for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+ Long endingOffset = endOffsets.get(entry.getKey());
+ offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
+ }
+
+ KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
+
+ KafkaInputFormat inputFormat = new KafkaInputFormat();
+ inputFormat.setConf(config);
+ List<InputSplit> splits = inputFormat.getSplits(null);
+
+ assertThat(splits.size(), is(offsets.size()));
+
+ for (InputSplit split : splits) {
+ KafkaInputSplit inputSplit = (KafkaInputSplit) split;
+ Pair<Long, Long> startEnd = offsets.get(inputSplit.getTopicPartition());
+ assertThat(inputSplit.getStartingOffset(), is(startEnd.first()));
+ assertThat(inputSplit.getEndingOffset(), is(startEnd.second()));
+ }
+ }
+
+ @Test
+ public void getSplitsSameStartEnd() throws IOException, InterruptedException {
+
+ Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+ for(int i = 0; i < 10; i++) {
+ offsets.put(new TopicPartition(topic, i), Pair.of((long)i, (long)i));
+ }
+
+ KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
+
+ KafkaInputFormat inputFormat = new KafkaInputFormat();
+ inputFormat.setConf(config);
+ List<InputSplit> splits = inputFormat.getSplits(null);
+
+ assertThat(splits.size(), is(0));
+ }
+
+ @Test
+ public void getSplitsCreateReaders() throws IOException, InterruptedException {
+ List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
+ Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
+ Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
+
+ Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+ for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+ Long endingOffset = endOffsets.get(entry.getKey());
+ offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
+ }
+
+ KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
+
+ KafkaInputFormat inputFormat = new KafkaInputFormat();
+ inputFormat.setConf(config);
+ List<InputSplit> splits = inputFormat.getSplits(null);
+
+ assertThat(splits.size(), is(offsets.size()));
+
+ for (InputSplit split : splits) {
+ KafkaInputSplit inputSplit = (KafkaInputSplit) split;
+ Pair<Long, Long> startEnd = offsets.get(inputSplit.getTopicPartition());
+ assertThat(inputSplit.getStartingOffset(), is(startEnd.first()));
+ assertThat(inputSplit.getEndingOffset(), is(startEnd.second()));
+ }
+
+ //create readers and consume the data
+ when(taskContext.getConfiguration()).thenReturn(config);
+ Set<String> keysRead = new HashSet<>();
+ //read all data from all splits
+ for (InputSplit split : splits) {
+ KafkaInputSplit inputSplit = (KafkaInputSplit) split;
+ long start = inputSplit.getStartingOffset();
+ long end = inputSplit.getEndingOffset();
+
+ RecordReader<BytesWritable, BytesWritable> recordReader = inputFormat.createRecordReader(split, taskContext);
+ recordReader.initialize(split, taskContext);
+
+ int numRecordsFound = 0;
+ while (recordReader.nextKeyValue()) {
+ keysRead.add(new String(recordReader.getCurrentKey().getBytes()));
+ assertThat(keys, hasItem(new String(recordReader.getCurrentKey().getBytes())));
+ assertThat(recordReader.getCurrentValue(), is(notNullValue()));
+ numRecordsFound++;
+ }
+ recordReader.close();
+
+ //assert that it encountered a partitions worth of data
+ assertThat(((long) numRecordsFound), is(end - start));
+ }
+
+ //validate the same number of unique keys was read as were written.
+ assertThat(keysRead.size(), is(keys.size()));
+ }
+
+ @Test
+ public void writeOffsetsToFormatBundle() {
+ Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+ String topic = testName.getMethodName();
+ int numPartitions = 10;
+ for (int i = 0; i < numPartitions; i++) {
+ TopicPartition tAndP = new TopicPartition(topic, i);
+ offsets.put(tAndP, Pair.of((long) i, i * 10L));
+ }
+
+ KafkaInputFormat.writeOffsetsToBundle(offsets, bundle);
+
+ ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<String> valueCaptor = ArgumentCaptor.forClass(String.class);
+
+ //number of Partitions * 2 for start and end + 1 for the topic
+ verify(bundle, times((numPartitions * 2) + 1)).set(keyCaptor.capture(), valueCaptor.capture());
+
+ List<String> keyValues = keyCaptor.getAllValues();
+ List<String> valueValues = valueCaptor.getAllValues();
+
+ String partitionKey = KafkaInputFormat.generateTopicPartitionsKey(topic);
+ assertThat(keyValues, hasItem(partitionKey));
+
+ String partitions = valueValues.get(keyValues.indexOf(partitionKey));
+ List<String> parts = Arrays.asList(partitions.split(","));
+
+ for (int i = 0; i < numPartitions; i++) {
+ assertThat(keyValues, hasItem(KafkaInputFormat.generateTopicPartitionsKey(topic)));
+ String startKey = KafkaInputFormat.generatePartitionStartKey(topic, i);
+ String endKey = KafkaInputFormat.generatePartitionEndKey(topic, i);
+ assertThat(keyValues, hasItem(startKey));
+ assertThat(keyValues, hasItem(endKey));
+ assertThat(valueValues.get(keyValues.indexOf(startKey)), is(Long.toString(i)));
+ assertThat(valueValues.get(keyValues.indexOf(endKey)), is(Long.toString(i * 10L)));
+ assertThat(parts, hasItem(Long.toString(i)));
+ }
+ }
+
+ @Test
+ public void writeOffsetsToFormatBundleSpecialCharacters() {
+ Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+ String topic = "partitions." + testName.getMethodName();
+ int numPartitions = 10;
+ for (int i = 0; i < numPartitions; i++) {
+ TopicPartition tAndP = new TopicPartition(topic, i);
+ offsets.put(tAndP, Pair.of((long) i, i * 10L));
+ }
+
+ KafkaInputFormat.writeOffsetsToBundle(offsets, bundle);
+
+ ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<String> valueCaptor = ArgumentCaptor.forClass(String.class);
+
+ //number of Partitions * 2 for start and end + 1 for the topic
+ verify(bundle, times((numPartitions * 2) + 1)).set(keyCaptor.capture(), valueCaptor.capture());
+
+ List<String> keyValues = keyCaptor.getAllValues();
+ List<String> valueValues = valueCaptor.getAllValues();
+
+ String partitionKey = KafkaInputFormat.generateTopicPartitionsKey(topic);
+ assertThat(keyValues, hasItem(partitionKey));
+
+ String partitions = valueValues.get(keyValues.indexOf(partitionKey));
+ List<String> parts = Arrays.asList(partitions.split(","));
+
+ for (int i = 0; i < numPartitions; i++) {
+ assertThat(keyValues, hasItem(KafkaInputFormat.generateTopicPartitionsKey(topic)));
+ String startKey = KafkaInputFormat.generatePartitionStartKey(topic, i);
+ String endKey = KafkaInputFormat.generatePartitionEndKey(topic, i);
+ assertThat(keyValues, hasItem(startKey));
+ assertThat(keyValues, hasItem(endKey));
+ assertThat(valueValues.get(keyValues.indexOf(startKey)), is(Long.toString(i)));
+ assertThat(valueValues.get(keyValues.indexOf(endKey)), is(Long.toString(i * 10L)));
+ assertThat(parts, hasItem(Long.toString(i)));
+ }
+ }
+
+ @Test
+ public void writeOffsetsToFormatBundleMultipleTopics() {
+ Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+ Set<String> topics = new HashSet<>();
+
+ int numPartitions = 10;
+ int numTopics = 10;
+ for (int j = 0; j < numTopics; j++) {
+ String topic = testName.getMethodName() + j;
+ topics.add(topic);
+ for (int i = 0; i < numPartitions; i++) {
+ TopicPartition tAndP = new TopicPartition(topic, i);
+ offsets.put(tAndP, Pair.of((long) i, i * 10L));
+ }
+ }
+
+ KafkaInputFormat.writeOffsetsToBundle(offsets, bundle);
+
+ ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<String> valueCaptor = ArgumentCaptor.forClass(String.class);
+
+ //number of Partitions * 2 for start and end + num of topics
+ verify(bundle, times((numTopics * numPartitions * 2) + numTopics)).set(keyCaptor.capture(), valueCaptor.capture());
+
+ List<String> keyValues = keyCaptor.getAllValues();
+ List<String> valueValues = valueCaptor.getAllValues();
+
+ for (String topic : topics) {
+
+ String partitionKey = KafkaInputFormat.generateTopicPartitionsKey(topic);
+ assertThat(keyValues, hasItem(partitionKey));
+
+ String partitions = valueValues.get(keyValues.indexOf(partitionKey));
+ List<String> parts = Arrays.asList(partitions.split(","));
+
+ for (int i = 0; i < numPartitions; i++) {
+ assertThat(keyValues, hasItem(KafkaInputFormat.generateTopicPartitionsKey(topic)));
+ String startKey = KafkaInputFormat.generatePartitionStartKey(topic, i);
+ String endKey = KafkaInputFormat.generatePartitionEndKey(topic, i);
+ assertThat(keyValues, hasItem(startKey));
+ assertThat(keyValues, hasItem(endKey));
+ assertThat(valueValues.get(keyValues.indexOf(startKey)), is(Long.toString(i)));
+ assertThat(valueValues.get(keyValues.indexOf(endKey)), is(Long.toString(i * 10L)));
+ assertThat(parts, hasItem(Long.toString(i)));
+ }
+ }
+ }
+
+ @Test
+ public void getOffsetsFromConfig() {
+ Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+ Set<String> topics = new HashSet<>();
+
+ int numPartitions = 10;
+ int numTopics = 10;
+ for (int j = 0; j < numTopics; j++) {
+ String topic = testName.getMethodName() + ".partitions" + j;
+ topics.add(topic);
+ for (int i = 0; i < numPartitions; i++) {
+ TopicPartition tAndP = new TopicPartition(topic, i);
+ offsets.put(tAndP, Pair.of((long) i, i * 10L));
+ }
+ }
+
+ Configuration config = new Configuration(false);
+
+ KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
+
+ Map<TopicPartition, Pair<Long, Long>> returnedOffsets = KafkaInputFormat.getOffsets(config);
+
+ assertThat(returnedOffsets.size(), is(returnedOffsets.size()));
+ for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) {
+ Pair<Long, Long> valuePair = returnedOffsets.get(entry.getKey());
+ assertThat(valuePair, is(entry.getValue()));
+ }
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void getOffsetsFromConfigMissingStart() {
+ Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+ Set<String> topics = new HashSet<>();
+
+ int numPartitions = 10;
+ int numTopics = 10;
+ for (int j = 0; j < numTopics; j++) {
+ String topic = testName.getMethodName() + ".partitions" + j;
+ topics.add(topic);
+ for (int i = 0; i < numPartitions; i++) {
+ TopicPartition tAndP = new TopicPartition(topic, i);
+ offsets.put(tAndP, Pair.of((long) i, i * 10L));
+ }
+ }
+
+ Configuration config = new Configuration(false);
+
+ KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
+
+ config.unset("org.apache.crunch.kafka.offsets.topic."+topics.iterator().next()+".partitions.0.start");
+
+ Map<TopicPartition, Pair<Long, Long>> returnedOffsets = KafkaInputFormat.getOffsets(config);
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void getOffsetsFromConfigMissingEnd() {
+ Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+ Set<String> topics = new HashSet<>();
+
+ int numPartitions = 10;
+ int numTopics = 10;
+ for (int j = 0; j < numTopics; j++) {
+ String topic = testName.getMethodName() + ".partitions" + j;
+ topics.add(topic);
+ for (int i = 0; i < numPartitions; i++) {
+ TopicPartition tAndP = new TopicPartition(topic, i);
+ offsets.put(tAndP, Pair.of((long) i, i * 10L));
+ }
+ }
+
+ Configuration config = new Configuration(false);
+
+ KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
+
+ config.unset("org.apache.crunch.kafka.offsets.topic."+topics.iterator().next()+".partitions.0.end");
+
+ Map<TopicPartition, Pair<Long, Long>> returnedOffsets = KafkaInputFormat.getOffsets(config);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputSplitTest.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputSplitTest.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputSplitTest.java
new file mode 100644
index 0000000..3833e9d
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputSplitTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.inputformat;
+
+import kafka.api.OffsetRequest;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class KafkaInputSplitTest {
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Test
+ public void createSplit() throws IOException, InterruptedException {
+ String topic = testName.getMethodName();
+ int partition = 18;
+ long startingOffet = 10;
+ long endingOffset = 23;
+
+
+ KafkaInputSplit split = new KafkaInputSplit(topic, partition, startingOffet, endingOffset);
+ assertThat(split.getStartingOffset(), is(startingOffet));
+ assertThat(split.getEndingOffset(), is(endingOffset));
+ assertThat(split.getTopicPartition(), is(new TopicPartition(topic, partition)));
+ assertThat(split.getLength(), is(endingOffset - startingOffet));
+ assertThat(split.getLocations(), is(new String[0]));
+ }
+
+ @Test
+ public void createSplitEarliestOffset() throws IOException, InterruptedException {
+ String topic = testName.getMethodName();
+ int partition = 18;
+ long endingOffset = 23;
+
+ KafkaInputSplit split = new KafkaInputSplit(topic, partition, -1L, endingOffset);
+ assertThat(split.getStartingOffset(), is(-1L));
+ assertThat(split.getEndingOffset(), is(endingOffset));
+ assertThat(split.getTopicPartition(), is(new TopicPartition(topic, partition)));
+ assertThat(split.getLength(), is(endingOffset));
+ assertThat(split.getLocations(), is(new String[0]));
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java
new file mode 100644
index 0000000..ba5b65b
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.inputformat;
+
+import kafka.api.OffsetRequest;
+import org.apache.crunch.Pair;
+import org.apache.crunch.kafka.ClusterTest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.crunch.kafka.KafkaUtils.getBrokerOffsets;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.matchers.JUnitMatchers.hasItem;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaRecordReaderIT {
+
+ @Mock
+ private TaskAttemptContext context;
+
+ @Rule
+ public TestName testName = new TestName();
+ private Properties consumerProps;
+ private Configuration config;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ ClusterTest.startTest();
+ }
+
+ @AfterClass
+ public static void cleanup() throws Exception {
+ ClusterTest.endTest();
+ }
+
+ private String topic;
+
+ @Before
+ public void setupTest() {
+ topic = testName.getMethodName();
+ consumerProps = ClusterTest.getConsumerProperties();
+ config = ClusterTest.getConsumerConfig();
+ when(context.getConfiguration()).thenReturn(config);
+ }
+
+ @Test
+ public void readData() throws IOException, InterruptedException {
+ List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
+
+ Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
+ Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
+
+ Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+ for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+ Long endingOffset = endOffsets.get(entry.getKey());
+ offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
+ }
+
+ KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
+
+ Set<String> keysRead = new HashSet<>();
+ //read all data from all splits
+ for (Map.Entry<TopicPartition, Pair<Long, Long>> partitionInfo : offsets.entrySet()) {
+ KafkaInputSplit split = new KafkaInputSplit(partitionInfo.getKey().topic(), partitionInfo.getKey().partition(),
+ partitionInfo.getValue().first(), partitionInfo.getValue().second());
+
+ KafkaRecordReader<String, String> recordReader = new KafkaRecordReader<>();
+ recordReader.initialize(split, context);
+
+ int numRecordsFound = 0;
+ while (recordReader.nextKeyValue()) {
+ keysRead.add(recordReader.getCurrentKey());
+ assertThat(keys, hasItem(recordReader.getCurrentKey()));
+ assertThat(recordReader.getCurrentValue(), is(notNullValue()));
+ numRecordsFound++;
+ }
+ recordReader.close();
+
+ //assert that it encountered a partitions worth of data
+ assertThat(((long) numRecordsFound), is(partitionInfo.getValue().second() - partitionInfo.getValue().first()));
+ }
+
+ //validate the same number of unique keys was read as were written.
+ assertThat(keysRead.size(), is(keys.size()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/EmbeddedZookeeper.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/EmbeddedZookeeper.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/EmbeddedZookeeper.java
new file mode 100644
index 0000000..ede3cf0
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/EmbeddedZookeeper.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * <p>
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.utils;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * Embedded Zookeeper instance for testing purposes.
+ * <p>
+ * Adapted from the {@code kafka.zk.EmbeddedZookeeper} class.
+ * </p>
+ */
+class EmbeddedZookeeper {
+
+ private final File snapshotDir;
+ private final File logDir;
+ private final NIOServerCnxnFactory factory;
+
+ /**
+ * Constructs an embedded Zookeeper instance.
+ *
+ * @param connectString Zookeeper connection string.
+ *
+ * @throws IOException if an error occurs during Zookeeper initialization.
+ */
+ public EmbeddedZookeeper(String connectString) throws IOException {
+ this.snapshotDir = KafkaTestUtils.getTempDir();
+ this.logDir = KafkaTestUtils.getTempDir();
+ this.factory = new NIOServerCnxnFactory();
+ String hostname = connectString.split(":")[0];
+ int port = Integer.valueOf(connectString.split(":")[1]);
+ int maxClientConnections = 1024;
+ factory.configure(new InetSocketAddress(hostname, port), maxClientConnections);
+ try {
+ int tickTime = 500;
+ factory.startup(new ZooKeeperServer(snapshotDir, logDir, tickTime));
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Shuts down the embedded Zookeeper instance.
+ */
+ public void shutdown() throws IOException {
+ factory.shutdown();
+ FileUtils.deleteDirectory(snapshotDir);
+ FileUtils.deleteDirectory(logDir);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java
new file mode 100644
index 0000000..f47f168
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crunch.kafka.utils;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.Time;
+import org.apache.commons.io.FileUtils;
+import scala.Option;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static scala.collection.JavaConversions.asJavaIterable;
+
+/**
+ * A test harness that brings up some number of Kafka broker nodes.
+ * <p>
+ * Adapted from the {@code kafka.integration.KafkaServerTestHarness} class.
+ * </p>
+ */
+public class KafkaBrokerTestHarness extends ZookeeperTestHarness {
+
+ /**
+ * Producer send acknowledgment timeout in milliseconds.
+ */
+ public static final String KAFKA_PRODUCER_ACK_TIMEOUT_MILLIS = "request.timeout.ms";
+
+ /**
+ * Producer send retry maximum count.
+ */
+ public static final String KAFKA_PRODUCER_RETRY_MAX = "message.send.max.retries";
+
+ /**
+ * Producer send retry backoff interval in milliseconds.
+ */
+ public static final String KAFKA_PRODUCER_RETRY_INTERVAL_MILLIS = "retry.backoff.ms";
+
+ /**
+ * Comma-delimited Kafka Zookeeper quorum list.
+ */
+ public static final String KAFKA_ZOOKEEPERS = "zookeeper.connect";
+
+ /**
+ * Comma-delimited list of Kafka brokers, for producer bootstrapping purposes.
+ */
+ public static final String KAFKA_BROKERS = "metadata.broker.list";
+
+ /**
+ * Default number of brokers in the Kafka cluster.
+ */
+ public static final int DEFAULT_BROKERS = 1;
+
+ /**
+ * Default number of partitions per Kafka topic.
+ */
+ public static final int PARTITIONS_PER_TOPIC = 4;
+
+ private List<KafkaConfig> brokerConfigs;
+ private List<KafkaServer> brokers;
+ private File clientConfig;
+ private boolean setUp;
+ private boolean tornDown;
+
+ /**
+ * Creates a new Kafka broker test harness using the {@link #DEFAULT_BROKERS default} number of brokers.
+ */
+ public KafkaBrokerTestHarness() {
+ this(DEFAULT_BROKERS, KafkaTestUtils.getPorts(1)[0]);
+ }
+
+ /**
+ * Creates a new Kafka broker test harness using the {@link #DEFAULT_BROKERS default} number of brokers and the supplied
+ * {@link Properties} which will be applied to the brokers.
+ *
+ * @param properties
+ * the additional {@link Properties} supplied to the brokers
+ * @throws IllegalArgumentException
+ * if {@code properties} is {@code null}
+ */
+ public KafkaBrokerTestHarness(Properties properties) {
+ this(DEFAULT_BROKERS, KafkaTestUtils.getPorts(1)[0], properties);
+ }
+
+ /**
+ * Creates a new Kafka broker test harness using the given number of brokers and Zookeeper port.
+ *
+ * @param brokers Number of Kafka brokers to start up.
+ * @param zookeeperPort The port number to use for Zookeeper client connections.
+ *
+ * @throws IllegalArgumentException if {@code brokers} is less than 1.
+ */
+ public KafkaBrokerTestHarness(int brokers, int zookeeperPort) {
+ this(getBrokerConfig(brokers, zookeeperPort), zookeeperPort);
+ }
+
+ /**
+ * Creates a new Kafka broker test harness using the given number of brokers and Zookeeper port.
+ *
+ * @param brokers
+ * Number of Kafka brokers to start up.
+ * @param zookeeperPort
+ * The port number to use for Zookeeper client connections.
+ * @param properties
+ * the additional {@link Properties} supplied to the brokers
+ *
+ * @throws IllegalArgumentException
+ * if {@code brokers} is less than 1 or if {@code baseProperties} is {@code null}
+ */
+ public KafkaBrokerTestHarness(int brokers, int zookeeperPort, Properties properties) {
+ this(getBrokerConfig(brokers, zookeeperPort, properties), zookeeperPort);
+ }
+
+ /**
+ * Creates a new Kafka broker test harness using the given broker configuration properties and Zookeeper port.
+ *
+ * @param brokerConfigs List of Kafka broker configurations.
+ * @param zookeeperPort The port number to use for Zookeeper client connections.
+ *
+ * @throws IllegalArgumentException if {@code brokerConfigs} is {@code null} or empty.
+ */
+ public KafkaBrokerTestHarness(List<KafkaConfig> brokerConfigs, int zookeeperPort) {
+ super(zookeeperPort);
+ if (brokerConfigs == null || brokerConfigs.isEmpty()) {
+ throw new IllegalArgumentException("Must supply at least one broker configuration.");
+ }
+ this.brokerConfigs = brokerConfigs;
+ this.brokers = null;
+ this.setUp = false;
+ this.tornDown = false;
+ }
+
+ /**
+ * Start up the Kafka broker cluster.
+ *
+ * @throws IOException if an error occurs during Kafka broker startup.
+ * @throws IllegalStateException if the Kafka broker cluster has already been {@link #setUp() setup}.
+ */
+ @Override
+ public void setUp() throws IOException {
+ if (setUp) {
+ throw new IllegalStateException("Already setup, cannot setup again");
+ }
+ setUp = true;
+
+ // Start up zookeeper.
+ super.setUp();
+
+ brokers = new ArrayList<KafkaServer>(brokerConfigs.size());
+ for (KafkaConfig config : brokerConfigs) {
+ brokers.add(startBroker(config));
+ }
+
+ // Write out Kafka client config to a temp file.
+ clientConfig = new File(KafkaTestUtils.getTempDir(), "kafka-config.xml");
+ FileWriter writer = new FileWriter(clientConfig);
+ writer.append("<configuration>");
+ for (String prop : Arrays.asList(KAFKA_BROKERS, KAFKA_ZOOKEEPERS)) {
+ writer.append("<property>");
+ writer.append("<name>").append(prop).append("</name>");
+ writer.append("<value>").append(getProps().getProperty(prop)).append("</value>");
+ writer.append("</property>");
+ }
+ writer.append("</configuration>");
+ writer.close();
+ }
+
+ /**
+ * Shutdown the Kafka broker cluster. Attempting to {@link #setUp()} a cluster again after calling this method is not allowed;
+ * a new {@code KafkaBrokerTestHarness} must be created instead.
+ *
+ * @throws IllegalStateException if the Kafka broker cluster has already been {@link #tearDown() torn down} or has not been
+ * {@link #setUp()}.
+ */
+ @Override
+ public void tearDown() throws IOException {
+ if (!setUp) {
+ throw new IllegalStateException("Not set up, cannot tear down");
+ }
+ if (tornDown) {
+ throw new IllegalStateException("Already torn down, cannot tear down again");
+ }
+ tornDown = true;
+
+ for (KafkaServer broker : brokers) {
+ broker.shutdown();
+ }
+
+ for (KafkaServer broker : brokers) {
+ for (String logDir : asJavaIterable(broker.config().logDirs())) {
+ FileUtils.deleteDirectory(new File(logDir));
+ }
+ }
+
+ // Shutdown zookeeper
+ super.tearDown();
+ }
+
+ /**
+ * Returns properties for a Kafka producer.
+ *
+ * @return Producer properties.
+ */
+ public Properties getProducerProps() {
+ StringBuilder brokers = new StringBuilder();
+ for (int i = 0; i < brokerConfigs.size(); ++i) {
+ KafkaConfig config = brokerConfigs.get(i);
+ brokers.append((i > 0) ? "," : "").append(config.hostName()).append(":").append(config.port());
+ }
+
+ Properties props = new Properties();
+ props.setProperty(KAFKA_BROKERS, brokers.toString());
+ props.setProperty(KAFKA_PRODUCER_ACK_TIMEOUT_MILLIS, "10000");
+
+ // These two properties below are increased from their defaults to help with the case that auto.create.topics.enable is
+ // disabled and a test tries to create a topic and immediately write to it
+ props.setProperty(KAFKA_PRODUCER_RETRY_INTERVAL_MILLIS, Integer.toString(500));
+ props.setProperty(KAFKA_PRODUCER_RETRY_MAX, Integer.toString(10));
+
+ return props;
+ }
+
+ /**
+ * Returns properties for a Kafka consumer.
+ *
+ * @return Consumer properties.
+ */
+ public Properties getConsumerProps() {
+ Properties props = new Properties();
+ props.setProperty(KAFKA_ZOOKEEPERS, zookeeperConnect);
+ return props;
+ }
+
+ /**
+ * Returns properties for either a Kafka producer or consumer.
+ *
+ * @return Combined producer and consumer properties.
+ */
+ public Properties getProps() {
+ // Combine producer and consumer properties.
+ Properties props = getProducerProps();
+ props.putAll(getConsumerProps());
+ return props;
+ }
+
+ /**
+ * Returns configuration properties for each Kafka broker in the cluster.
+ *
+ * @return Broker properties.
+ */
+ public List<Properties> getBrokerProps() {
+ List<Properties> props = new ArrayList<Properties>(brokers.size());
+ for (KafkaServer broker : brokers) {
+ Properties prop = new Properties();
+ prop.putAll(broker.config().props());
+ props.add(prop);
+ }
+ return props;
+ }
+
+ /**
+ * Creates a collection of Kafka Broker configurations based on the number of brokers and zookeeper.
+ * @param brokers the number of brokers to create configuration for.
+ * @param zookeeperPort the zookeeper port for the brokers to connect to.
+ * @return configuration for a collection of brokers.
+ * @throws IllegalArgumentException if {@code brokers} is less than 1
+ */
+ public static List<KafkaConfig> getBrokerConfig(int brokers, int zookeeperPort) {
+ return getBrokerConfig(brokers, zookeeperPort, new Properties());
+ }
+
+ /**
+ * Creates a collection of Kafka Broker configurations based on the number of brokers and zookeeper.
+ * @param brokers the number of brokers to create configuration for.
+ * @param zookeeperPort the zookeeper port for the brokers to connect to.
+ * @param baseProperties basic properties that should be applied for each broker config. These properties will be
+ * honored in favor of any default properties.
+ * @return configuration for a collection of brokers.
+ * @throws IllegalArgumentException if {@code brokers} is less than 1 or {@code baseProperties} is {@code null}.
+ */
+ public static List<KafkaConfig> getBrokerConfig(int brokers, int zookeeperPort, Properties baseProperties) {
+ if (brokers < 1) {
+ throw new IllegalArgumentException("Invalid broker count: " + brokers);
+ }
+ if (baseProperties == null) {
+ throw new IllegalArgumentException("The 'baseProperties' cannot be 'null'.");
+ }
+
+ int ports[] = KafkaTestUtils.getPorts(brokers);
+
+ List<KafkaConfig> configs = new ArrayList<KafkaConfig>(brokers);
+ for (int i = 0; i < brokers; ++i) {
+ Properties props = new Properties();
+ props.setProperty(KAFKA_ZOOKEEPERS, "localhost:" + zookeeperPort);
+ props.setProperty("broker.id", String.valueOf(i + 1));
+ props.setProperty("host.name", "localhost");
+ props.setProperty("port", String.valueOf(ports[i]));
+ props.setProperty("log.dir", KafkaTestUtils.getTempDir().getAbsolutePath());
+ props.setProperty("log.flush.interval.messages", String.valueOf(1));
+ props.setProperty("num.partitions", String.valueOf(PARTITIONS_PER_TOPIC));
+ props.setProperty("default.replication.factor", String.valueOf(brokers));
+ props.setProperty("auto.create.topics.enable", Boolean.FALSE.toString());
+
+ props.putAll(baseProperties);
+
+ configs.add(new KafkaConfig(props));
+ }
+ return configs;
+ }
+
+ /**
+ * Returns location of Kafka client configuration file containing broker and zookeeper connection properties.
+ * <p>
+ * This file can be loaded using the {@code -conf} command option to easily achieve Kafka connectivity.
+ * </p>
+ *
+ * @return Kafka client configuration file path
+ */
+ public String getClientConfigPath() {
+ return clientConfig.getAbsolutePath();
+ }
+
+ private static KafkaServer startBroker(KafkaConfig config) {
+ KafkaServer server = new KafkaServer(config, new SystemTime(), Option.<String>empty());
+ server.startup();
+ return server;
+ }
+
+ private static class SystemTime implements Time {
+ @Override
+ public long milliseconds() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public long nanoseconds() {
+ return System.nanoTime();
+ }
+
+ @Override
+ public void sleep(long ms) {
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaTestUtils.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaTestUtils.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaTestUtils.java
new file mode 100644
index 0000000..f8eb2ff
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaTestUtils.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * Assorted Kafka testing utility methods.
+ */
+public class KafkaTestUtils {
+
+ private static final Random RANDOM = new Random();
+ private static final String TEMP_DIR_PREFIX = "kafka-";
+
+ private static final Set<Integer> USED_PORTS = new HashSet<Integer>();
+
+ /**
+ * Creates and returns a new randomly named temporary directory. It will be deleted upon JVM exit.
+ *
+ * @return a new temporary directory.
+ *
+ * @throws RuntimeException if a new temporary directory could not be created.
+ */
+ public static File getTempDir() {
+ File file = new File(System.getProperty("java.io.tmpdir"), TEMP_DIR_PREFIX + RANDOM.nextInt(10000000));
+ if (!file.mkdirs()) {
+ throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath());
+ }
+ file.deleteOnExit();
+ return file;
+ }
+
+ /**
+ * Returns an array containing the specified number of available local ports.
+ *
+ * @param count Number of local ports to identify and return.
+ *
+ * @return an array of available local port numbers.
+ *
+ * @throws RuntimeException if an I/O error occurs opening or closing a socket.
+ */
+ public static int[] getPorts(int count) {
+ int[] ports = new int[count];
+ Set<ServerSocket> openSockets = new HashSet<ServerSocket>(count + USED_PORTS.size());
+
+ for (int i = 0; i < count; ) {
+ try {
+ ServerSocket socket = new ServerSocket(0);
+ int port = socket.getLocalPort();
+ openSockets.add(socket);
+
+ // Disallow port reuse.
+ if (!USED_PORTS.contains(port)) {
+ ports[i++] = port;
+ USED_PORTS.add(port);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("could not open socket", e);
+ }
+ }
+
+ // Close the sockets so that their port numbers can be used by the caller.
+ for (ServerSocket socket : openSockets) {
+ try {
+ socket.close();
+ } catch (IOException e) {
+ throw new RuntimeException("could not close socket", e);
+ }
+ }
+
+ return ports;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZkStringSerializer.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZkStringSerializer.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZkStringSerializer.java
new file mode 100644
index 0000000..6ee102e
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZkStringSerializer.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.utils;
+
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+import java.nio.charset.Charset;
+
+/**
+ * A {@link ZkSerializer Zookeeper serializer} for {@link String} objects.
+ * <p>
+ * Ported from the {@code kafka.utils.ZKStringSerializer} scala object.
+ * </p>
+ */
+public class ZkStringSerializer implements ZkSerializer {
+
+ private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+ @Override
+ public byte[] serialize(Object data) throws ZkMarshallingError {
+ return ((String) data).getBytes(UTF_8);
+ }
+
+ @Override
+ public Object deserialize(byte[] bytes) throws ZkMarshallingError {
+ return bytes != null ? new String(bytes, UTF_8) : null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZookeeperTestHarness.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZookeeperTestHarness.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZookeeperTestHarness.java
new file mode 100644
index 0000000..c4a7e15
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZookeeperTestHarness.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crunch.kafka.utils;
+
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+
+import java.io.IOException;
+
+/**
+ * A test harness that brings up an embedded Zookeeper instance.
+ * <p>
+ * Adapted from the {@code kafka.zk.ZooKeeperTestHarness} class.
+ * </p>
+ */
+public class ZookeeperTestHarness {
+
+ /**
+ * Zookeeper connection info.
+ */
+ protected final String zookeeperConnect;
+
+ private EmbeddedZookeeper zookeeper;
+ private final int zkConnectionTimeout;
+ private final int zkSessionTimeout;
+
+ /**
+ * Zookeeper client connection.
+ */
+ protected ZkUtils zkUtils;
+
+ /**
+ * Creates a new Zookeeper broker test harness.
+ */
+ public ZookeeperTestHarness() {
+ this(KafkaTestUtils.getPorts(1)[0]);
+ }
+
+ /**
+ * Creates a new Zookeeper service test harness using the given port.
+ *
+ * @param zookeeperPort The port number to use for Zookeeper client connections.
+ */
+ public ZookeeperTestHarness(int zookeeperPort) {
+ this.zookeeper = null;
+ this.zkUtils = null;
+ this.zkConnectionTimeout = 6000;
+ this.zkSessionTimeout = 6000;
+ this.zookeeperConnect = "localhost:" + zookeeperPort;
+ }
+
+ /**
+ * Returns a client for communicating with the Zookeeper service.
+ *
+ * @return A Zookeeper client.
+ *
+ * @throws IllegalStateException
+ * if Zookeeper has not yet been {@link #setUp()}, or has already been {@link #tearDown() torn down}.
+ */
+ public ZkClient getZkClient() {
+ if (zkUtils == null) {
+ throw new IllegalStateException("Zookeeper service is not active");
+ }
+ return zkUtils.zkClient();
+ }
+
+ public ZkUtils getZkUtils() {
+ return zkUtils;
+ }
+
+ /**
+ * Startup Zookeeper.
+ *
+ * @throws IOException if an error occurs during Zookeeper initialization.
+ */
+ public void setUp() throws IOException {
+ zookeeper = new EmbeddedZookeeper(zookeeperConnect);
+ ZkClient zkClient = new ZkClient(zookeeperConnect, zkSessionTimeout, zkConnectionTimeout, new ZkStringSerializer());
+ ZkConnection connection = new ZkConnection(zookeeperConnect, zkSessionTimeout);
+ zkUtils = new ZkUtils(zkClient, connection, false);
+ }
+
+ /**
+ * Shutdown Zookeeper.
+ */
+ public void tearDown() throws IOException {
+ if (zkUtils != null) {
+ zkUtils.close();
+ zkUtils = null;
+ }
+ if (zookeeper != null) {
+ zookeeper.shutdown();
+ zookeeper = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/resources/log4j.properties b/crunch-kafka/src/test/resources/log4j.properties
new file mode 100644
index 0000000..0b3eeee
--- /dev/null
+++ b/crunch-kafka/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, A1
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+
+# Print the date in ISO 8601 format
+log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
+
+# Limit Apache logging to keep us from being overwhelmed as our tests stop and restart servers.
+log4j.logger.org.apache.hadoop=WARN
+log4j.logger.org.apache.zookeeper=WARN
+log4j.logger.org.mortbay=WARN
+log4j.logger.org.apache.zookeeper.client.ZooKeeperSaslClient=ERROR
+log4j.logger.org.apache.hadoop.conf.Configuration=ERROR
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5b390ee..78ea085 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,7 @@ under the License.
<module>crunch-spark</module>
<module>crunch-hive</module>
<module>crunch-dist</module>
+ <module>crunch-kafka</module>
</modules>
<profiles>
<profile>
@@ -103,6 +104,7 @@ under the License.
<hbase.version>1.0.0</hbase.version>
<avro.classifier>hadoop2</avro.classifier>
+ <kafka.version>0.9.0.1</kafka.version>
<scala.base.version>2.10</scala.base.version>
<scala.version>2.10.4</scala.version>
<scalatest.version>2.2.4</scalatest.version>
@@ -455,6 +457,17 @@ under the License.
</dependency>
<dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.base.version}</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>${jsr305.version}</version>
[2/3] crunch git commit: CRUNCH-606: Kafka Source for Crunch which
supports reading data as BytesWritable
Posted by mk...@apache.org.
CRUNCH-606: Kafka Source for Crunch which supports reading data as BytesWritable
* Some of the code contributed by Bryan Baugher and Andrew Olson
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/321cfef6
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/321cfef6
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/321cfef6
Branch: refs/heads/master
Commit: 321cfef6e85325ab7a4d9548686a96972f6f31fd
Parents: c09c4ee
Author: Micah Whitacre <mk...@gmail.com>
Authored: Mon Apr 11 09:47:33 2016 -0500
Committer: Micah Whitacre <mk...@gmail.com>
Committed: Mon May 23 15:13:36 2016 -0500
----------------------------------------------------------------------
crunch-kafka/pom.xml | 82 ++++
.../java/org/apache/crunch/kafka/KafkaData.java | 63 +++
.../crunch/kafka/KafkaRecordsIterable.java | 294 +++++++++++++
.../org/apache/crunch/kafka/KafkaSource.java | 225 ++++++++++
.../org/apache/crunch/kafka/KafkaUtils.java | 301 ++++++++++++++
.../kafka/inputformat/KafkaInputFormat.java | 235 +++++++++++
.../kafka/inputformat/KafkaInputSplit.java | 117 ++++++
.../kafka/inputformat/KafkaRecordReader.java | 152 +++++++
.../org/apache/crunch/kafka/ClusterTest.java | 217 ++++++++++
.../org/apache/crunch/kafka/KafkaDataIT.java | 118 ++++++
.../crunch/kafka/KafkaRecordsIterableIT.java | 415 +++++++++++++++++++
.../org/apache/crunch/kafka/KafkaSourceIT.java | 169 ++++++++
.../org/apache/crunch/kafka/KafkaUtilsIT.java | 188 +++++++++
.../kafka/inputformat/KafkaInputFormatIT.java | 407 ++++++++++++++++++
.../kafka/inputformat/KafkaInputSplitTest.java | 65 +++
.../kafka/inputformat/KafkaRecordReaderIT.java | 122 ++++++
.../crunch/kafka/utils/EmbeddedZookeeper.java | 102 +++++
.../kafka/utils/KafkaBrokerTestHarness.java | 369 +++++++++++++++++
.../crunch/kafka/utils/KafkaTestUtils.java | 94 +++++
.../crunch/kafka/utils/ZkStringSerializer.java | 43 ++
.../kafka/utils/ZookeeperTestHarness.java | 112 +++++
.../src/test/resources/log4j.properties | 29 ++
pom.xml | 13 +
23 files changed, 3932 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-kafka/pom.xml b/crunch-kafka/pom.xml
new file mode 100644
index 0000000..a96a9b0
--- /dev/null
+++ b/crunch-kafka/pom.xml
@@ -0,0 +1,82 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.crunch</groupId>
+ <artifactId>crunch-parent</artifactId>
+ <version>0.14.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>crunch-kafka</artifactId>
+ <name>Apache Crunch for Kafka</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.crunch</groupId>
+ <artifactId>crunch-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.10</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.crunch</groupId>
+ <artifactId>crunch-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaData.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaData.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaData.java
new file mode 100644
index 0000000..6543aad
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaData.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.SourceTarget;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+class KafkaData<K, V> implements ReadableData<Pair<K, V>> {
+
+ private static final long serialVersionUID = -6582212311361579556L;
+
+ private final Map<TopicPartition, Pair<Long, Long>> offsets;
+ private final Properties props;
+
+ public KafkaData(Properties connectionProperties,
+ Map<TopicPartition, Pair<Long, Long>> offsets) {
+ this.props = connectionProperties;
+ this.offsets = offsets;
+ }
+
+
+ @Override
+ public Set<SourceTarget<?>> getSourceTargets() {
+ return null;
+ }
+
+ @Override
+ public void configure(Configuration conf) {
+ //no-op
+ }
+
+ @Override
+ public Iterable<Pair<K, V>> read(TaskInputOutputContext<?, ?, ?, ?> context) throws IOException {
+ Consumer<K, V> consumer = new KafkaConsumer<K, V>(props);
+ return new KafkaRecordsIterable<>(consumer, offsets, props);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaRecordsIterable.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaRecordsIterable.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaRecordsIterable.java
new file mode 100644
index 0000000..8fec7f8
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaRecordsIterable.java
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.kafka.inputformat.KafkaRecordReader;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Properties;
+import java.util.Set;
+
+
+class KafkaRecordsIterable<K, V> implements Iterable<Pair<K, V>> {
+
+ /**
+ * Logger
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordsIterable.class);
+
+ /**
+ * The Kafka consumer responsible for retrieving messages.
+ */
+ private final Consumer<K, V> consumer;
+
+ /**
+ * The starting positions of the iterable for the topic.
+ */
+ private final Map<TopicPartition, Pair<Long, Long>> offsets;
+
+ /**
+ * Tracks if the iterable is empty.
+ */
+ private final boolean isEmpty;
+
+ /**
+ * The poll time between each request to Kafka
+ */
+ private final long scanPollTime;
+
+ private final int maxRetryAttempts;
+
+ /**
+ * Creates the iterable that will pull values for a collection of topics using the provided {@code consumer} between
+ * the {@code startOffsets} and {@code stopOffsets}.
+ * @param consumer The consumer for pulling the data from Kafka. The consumer will be closed automatically once all
+ * of the records have been consumed.
+ * @param offsets offsets for pulling data
+ * @param properties properties for tweaking the behavior of the iterable.
+ * @throws IllegalArgumentException if any of the arguments are {@code null} or empty.
+ */
+ public KafkaRecordsIterable(Consumer<K, V> consumer, Map<TopicPartition, Pair<Long, Long>> offsets,
+ Properties properties) {
+ if (consumer == null) {
+ throw new IllegalArgumentException("The 'consumer' cannot be 'null'.");
+ }
+ this.consumer = consumer;
+
+ if (properties == null) {
+ throw new IllegalArgumentException("The 'properties' cannot be 'null'.");
+ }
+
+ String retryString = properties.getProperty(KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY,
+ KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT_STRING);
+ maxRetryAttempts = Integer.parseInt(retryString);
+
+ if (offsets == null || offsets.isEmpty()) {
+ throw new IllegalArgumentException("The 'offsets' cannot 'null' or empty.");
+ }
+
+ //filter out any topics and partitions that do not have offset ranges that will produce data.
+ Map<TopicPartition, Pair<Long, Long>> filteredOffsets = new HashMap<>();
+ for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) {
+ Pair<Long, Long> value = entry.getValue();
+ //if start is less than one less than stop then there is data to be had
+ if(value.first() < value.second()){
+ filteredOffsets.put(entry.getKey(), value);
+ }else{
+ LOG.debug("Removing offsets for {} because start is not less than the end offset.", entry.getKey());
+ }
+ }
+
+ //check to make sure that based on the offsets there is data to retrieve, otherwise false.
+ //there will be data if the start offsets are less than stop offsets
+ isEmpty = filteredOffsets.isEmpty();
+ if (isEmpty) {
+ LOG.warn("Iterable for Kafka for is empty because offsets are empty.");
+ }
+
+ //assign this
+ this.offsets = filteredOffsets;
+
+ scanPollTime = Long.parseLong(properties.getProperty(KafkaSource.CONSUMER_POLL_TIMEOUT_KEY,
+ Long.toString(KafkaSource.CONSUMER_POLL_TIMEOUT_DEFAULT)));
+ }
+
+ @Override
+ public Iterator<Pair<K, V>> iterator() {
+ if (isEmpty) {
+ LOG.debug("Returning empty iterator since offsets align.");
+ return Collections.emptyIterator();
+ }
+ //Assign consumer to all of the partitions
+ LOG.debug("Assigning topics and partitions and seeking to start offsets.");
+
+ consumer.assign(new LinkedList<>(offsets.keySet()));
+ //hack so maybe look at removing this
+ consumer.poll(0);
+ for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) {
+ consumer.seek(entry.getKey(), entry.getValue().first());
+ }
+
+ return new RecordsIterator<K, V>(consumer, offsets, scanPollTime, maxRetryAttempts);
+ }
+
+ private static class RecordsIterator<K, V> implements Iterator<Pair<K, V>> {
+
+ private final Consumer<K, V> consumer;
+ private final Map<TopicPartition, Pair<Long, Long>> offsets;
+ private final long pollTime;
+ private final int maxNumAttempts;
+ private ConsumerRecords<K, V> records;
+ private Iterator<ConsumerRecord<K, V>> currentIterator;
+ private final Set<TopicPartition> remainingPartitions;
+
+ private Pair<K, V> next;
+
+ public RecordsIterator(Consumer<K, V> consumer,
+ Map<TopicPartition, Pair<Long, Long>> offsets, long pollTime, int maxNumRetries) {
+ this.consumer = consumer;
+ remainingPartitions = new HashSet<>(offsets.keySet());
+ this.offsets = offsets;
+ this.pollTime = pollTime;
+ this.maxNumAttempts = maxNumRetries;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+
+ //if partitions to consume then pull next value
+ if (remainingPartitions.size() > 0) {
+ next = getNext();
+ }
+
+ return next != null;
+ }
+
+ @Override
+ public Pair<K, V> next() {
+ if (next == null) {
+ next = getNext();
+ }
+
+ if (next != null) {
+ Pair<K, V> returnedNext = next;
+ //prime for next call
+ next = getNext();
+ //return the current next
+ return returnedNext;
+ } else {
+ throw new NoSuchElementException("No more elements.");
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove is not supported.");
+ }
+
+ /**
+ * Gets the current iterator.
+ *
+ * @return the current iterator or {@code null} if there are no more values to consume.
+ */
+ private Iterator<ConsumerRecord<K, V>> getIterator() {
+ if (!remainingPartitions.isEmpty()) {
+ if (currentIterator != null && currentIterator.hasNext()) {
+ return currentIterator;
+ }
+ LOG.debug("Retrieving next set of records.");
+ int numTries = 0;
+ boolean notSuccess = false;
+ while(!notSuccess && numTries < maxNumAttempts) {
+ try {
+ records = consumer.poll(pollTime);
+ notSuccess = true;
+ }catch(RetriableException re){
+ numTries++;
+ if(numTries < maxNumAttempts) {
+ LOG.warn("Error pulling messages from Kafka. Retrying with attempt {}", numTries, re);
+ }else{
+ LOG.error("Error pulling messages from Kafka. Exceeded maximum number of attempts {}", maxNumAttempts, re);
+ throw re;
+ }
+ }
+ }
+ if (records == null || records.isEmpty()) {
+ LOG.debug("Retrieved empty records.");
+ currentIterator = null;
+ return null;
+ }
+ currentIterator = records.iterator();
+ return currentIterator;
+ }
+
+ LOG.debug("No more partitions to consume therefore not retrieving any more records.");
+ return null;
+ }
+
+ /**
+ * Internal method for retrieving the next value to retrieve.
+ *
+ * @return {@code null} if there are no more values to retrieve otherwise the next event.
+ */
+ private Pair<K, V> getNext() {
+ while (!remainingPartitions.isEmpty()) {
+ Iterator<ConsumerRecord<K, V>> iterator = getIterator();
+
+ while (iterator != null && iterator.hasNext()) {
+ ConsumerRecord<K, V> record = iterator.next();
+ TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
+ long offset = record.offset();
+
+ if (withinRange(topicPartition, offset)) {
+ LOG.debug("Retrieving value for {} with offset {}.", topicPartition, offset);
+ return Pair.of(record.key(), record.value());
+ }
+ LOG.debug("Value for {} with offset {} is outside of range skipping.", topicPartition, offset);
+ }
+ }
+
+ LOG.debug("Closing the consumer because there are no more remaining partitions.");
+ consumer.close();
+
+ LOG.debug("Consumed data from all partitions.");
+ return null;
+
+ }
+
+ /**
+ * Checks whether the value for {@code topicPartition} with an {@code offset} is within scan range. If
+ * the value is not then {@code false} is returned otherwise {@code true}.
+ *
+ * @param topicPartion The partition for the offset
+ * @param offset the offset in the partition
+ * @return {@code true} if the value is within the expected consumption range, otherwise {@code false}.
+ */
+ private boolean withinRange(TopicPartition topicPartion, long offset) {
+ long endOffset = offsets.get(topicPartion).second();
+ //end offsets are one higher than the last written value.
+ boolean emit = offset < endOffset;
+ if (offset >= endOffset - 1) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Completed consuming partition {} with offset {} and ending offset {}.",
+ new Object[]{topicPartion, offset, endOffset});
+ }
+ remainingPartitions.remove(topicPartion);
+ consumer.pause(topicPartion);
+ }
+ LOG.debug("Value for partition {} and offset {} is within range.", topicPartion, offset);
+ return emit;
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
new file mode 100644
index 0000000..485604d
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.Source;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.impl.mr.run.CrunchMapper;
+import org.apache.crunch.io.CrunchInputs;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.kafka.inputformat.KafkaInputFormat;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A Crunch Source that will retrieve events from Kafka given start and end offsets. The source is not designed to
+ * process unbounded data but instead to retrieve data between a specified range.
+ * <p>
+ *
+ * The values retrieved from Kafka are returned as raw bytes inside of a {@link BytesWritable}. If callers
+ * need specific parsing logic based on the topic then consumers are encouraged to use multiple Kafka Sources
+ * for each topic and use special {@link DoFn} to parse the payload.
+ */
+public class KafkaSource
+ implements TableSource<BytesWritable, BytesWritable>, ReadableSource<Pair<BytesWritable, BytesWritable>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
+
+ private final FormatBundle inputBundle;
+ private final Properties props;
+ private final Map<TopicPartition, Pair<Long, Long>> offsets;
+
+ /**
+ * The consistent PType describing all of the data being retrieved from Kafka as a BytesWritable.
+ */
+ private static PTableType<BytesWritable, BytesWritable> KAFKA_SOURCE_TYPE =
+ Writables.tableOf(Writables.writables(BytesWritable.class), Writables.writables(BytesWritable.class));
+
+ /**
+ * Constant to indicate how long the reader waits before timing out when retrieving data from Kafka.
+ */
+ public static final String CONSUMER_POLL_TIMEOUT_KEY = "org.apache.crunch.kafka.consumer.poll.timeout";
+
+ /**
+ * Default timeout value for {@link #CONSUMER_POLL_TIMEOUT_KEY} of 1 second.
+ */
+ public static final long CONSUMER_POLL_TIMEOUT_DEFAULT = 1000L;
+
+
+ /**
+ * Constructs a Kafka source that will read data from the Kafka cluster identified by the {@code kafkaConnectionProperties}
+ * and from the specific topics and partitions identified in the {@code offsets}
+ * @param kafkaConnectionProperties The connection properties for reading from Kafka. These properties will be honored
+ * with the exception of the {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and
+ * {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+ * @param offsets A map of {@link TopicPartition} to a pair of start and end offsets respectively. The start and end offsets
+ * are evaluated at [start, end) where the ending offset is excluded. Each TopicPartition must have a
+ * non-null pair describing its offsets. The start offset should be less than the end offset. If the values
+ * are equal or start is greater than the end then that partition will be skipped.
+ */
+ public KafkaSource(Properties kafkaConnectionProperties, Map<TopicPartition, Pair<Long, Long>> offsets) {
+ this.props = copyAndSetProperties(kafkaConnectionProperties);
+
+ inputBundle = createFormatBundle(props, offsets);
+
+ this.offsets = Collections.unmodifiableMap(new HashMap<>(offsets));
+ }
+
+ @Override
+ public Source<Pair<BytesWritable, BytesWritable>> inputConf(String key, String value) {
+ inputBundle.set(key, value);
+ return this;
+ }
+
+ @Override
+ public PType<Pair<BytesWritable, BytesWritable>> getType() {
+ return KAFKA_SOURCE_TYPE;
+ }
+
+ @Override
+ public Converter<?, ?, ?, ?> getConverter() {
+ return KAFKA_SOURCE_TYPE.getConverter();
+ }
+
+ @Override
+ public PTableType<BytesWritable, BytesWritable> getTableType() {
+ return KAFKA_SOURCE_TYPE;
+ }
+
+ @Override
+ public long getSize(Configuration configuration) {
+ // TODO something smarter here.
+ return 1000L * 1000L * 1000L;
+ }
+
+ @Override
+ public String toString() {
+ return "KafkaSource("+props.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)+")";
+ }
+
+ @Override
+ public long getLastModifiedAt(Configuration configuration) {
+ LOG.warn("Cannot determine last modified time for source: {}", toString());
+ return -1;
+ }
+
+ private static <K, V> FormatBundle createFormatBundle(Properties kafkaConnectionProperties,
+ Map<TopicPartition, Pair<Long, Long>> offsets) {
+
+ FormatBundle<KafkaInputFormat> bundle = FormatBundle.forInput(KafkaInputFormat.class);
+
+ KafkaInputFormat.writeOffsetsToBundle(offsets, bundle);
+
+ for (String name : kafkaConnectionProperties.stringPropertyNames()) {
+ bundle.set(name, kafkaConnectionProperties.getProperty(name));
+ }
+
+ return bundle;
+ }
+
+ private static <K, V> Properties copyAndSetProperties(Properties kakfaConnectionProperties) {
+ Properties props = new Properties();
+ props.putAll(kakfaConnectionProperties);
+
+ //Setting the key/value deserializer to ensure proper translation from Kafka to PType format.
+ props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
+ props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
+
+ return props;
+ }
+
+
+ @Override
+ public Iterable<Pair<BytesWritable, BytesWritable>> read(Configuration conf) throws IOException {
+ // consumer will get closed when the iterable is fully consumed.
+ // skip using the inputformat/splits since this will be read in a single JVM and don't need the complexity
+ // of parallelism when reading.
+ Consumer<BytesWritable, BytesWritable> consumer = new KafkaConsumer<>(props);
+ return new KafkaRecordsIterable<>(consumer, offsets, props);
+ }
+
+
+ @Override
+ public void configureSource(Job job, int inputId) throws IOException {
+ Configuration conf = job.getConfiguration();
+ //an id of -1 indicates that this is the only input so just use it directly
+ if (inputId == -1) {
+ job.setMapperClass(CrunchMapper.class);
+ job.setInputFormatClass(inputBundle.getFormatClass());
+ inputBundle.configure(conf);
+ } else {
+ //there are multiple inputs for this mapper so add it as a CrunchInputs and need a fake path just to
+ //make it play well with other file based inputs.
+ Path dummy = new Path("/kafka/" + inputId);
+ CrunchInputs.addInputPath(job, dummy, inputBundle, inputId);
+ }
+ }
+
+ @Override
+ public ReadableData<Pair<BytesWritable, BytesWritable>> asReadable() {
+ // skip using the inputformat/splits since this will be read in a single JVM and don't need the complexity
+ // of parallelism when reading.
+ return new KafkaData<>(props, offsets);
+ }
+
+
+ /**
+ * Basic {@link Deserializer} which simply wraps the payload as a {@link BytesWritable}.
+ */
+ public static class BytesDeserializer implements Deserializer<BytesWritable> {
+
+ @Override
+ public void configure(Map<String, ?> configProperties, boolean isKey) {
+ //no-op
+ }
+
+ @Override
+ public BytesWritable deserialize(String topic, byte[] valueBytes) {
+ return new BytesWritable(valueBytes);
+ }
+
+ @Override
+ public void close() {
+ //no-op
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
new file mode 100644
index 0000000..aeea6fb
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.cluster.Broker;
+import kafka.cluster.BrokerEndPoint;
+import kafka.cluster.EndPoint;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetRequest;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.javaapi.TopicMetadataResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+/**
+ * Simple utilities for retrieving offset and Kafka information to assist in setting up and configuring a
+ * {@link KafkaSource} instance.
+ */
+public class KafkaUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
+
+ private static final String CLIENT_ID = "crunch-kafka-client";
+
+ private static final Random RANDOM = new Random();
+
+ /**
+ * Configuration property for the number of retry attempts that will be made to Kafka.
+ */
+ public static final String KAFKA_RETRY_ATTEMPTS_KEY = "org.apache.crunch.kafka.retry.attempts";
+
+ /**
+ * Default number of retry attempts.
+ */
+ public static final int KAFKA_RETRY_ATTEMPTS_DEFAULT = 5;
+ public static final String KAFKA_RETRY_ATTEMPTS_DEFAULT_STRING = Integer.toString(KAFKA_RETRY_ATTEMPTS_DEFAULT);
+
+ /**
+ * Converts the provided {@code config} into a {@link Properties} object to connect with Kafka.
+ * @param config the config to read properties
+ * @return a properties instance populated with all of the values inside the provided {@code config}.
+ */
+ public static Properties getKafkaConnectionProperties(Configuration config) {
+ Properties props = new Properties();
+ for (Map.Entry<String, String> value : config) {
+ props.setProperty(value.getKey(), value.getValue());
+ }
+
+ return props;
+ }
+
+ /**
+ * Adds the {@code properties} to the provided {@code config} instance.
+ * @param properties the properties to add to the config.
+ * @param config the configuration instance to be modified.
+ * @return the config instance with the populated properties
+ */
+ public static Configuration addKafkaConnectionProperties(Properties properties, Configuration config) {
+ for (String name : properties.stringPropertyNames()) {
+ config.set(name, properties.getProperty(name));
+ }
+ return config;
+ }
+
+ /**
+ * Returns a {@link TopicMetadataRequest} from the given topics
+ *
+ * @param topics an array of topics you want metadata for
+ * @return a {@link TopicMetadataRequest} from the given topics
+ * @throws IllegalArgumentException if topics is {@code null} or empty, or if any of the topics is null, empty or blank
+ */
+ private static TopicMetadataRequest getTopicMetadataRequest(String... topics) {
+ if (topics == null)
+ throw new IllegalArgumentException("topics cannot be null");
+ if (topics.length == 0)
+ throw new IllegalArgumentException("topics cannot be empty");
+
+ for (String topic : topics)
+ if (StringUtils.isBlank(topic))
+ throw new IllegalArgumentException("No topic can be null, empty or blank");
+
+ return new TopicMetadataRequest(Arrays.asList(topics));
+ }
+
+ /**
+ * <p>
+ * Retrieves the offset values for an array of topics at the specified time.
+ * </p>
+ * <p>
+ * If the Kafka cluster does not have the logs for the partition at the specified time or if the topic did not exist
+ * at that time this will instead return the earliest offset for that partition.
+ * </p>
+ *
+ * @param properties the properties containing the configuration for kafka
+ * @param time the time at which we want to know what the offset values were
+ * @param topics the topics we want to know the offset values of
+ * @return the offset values for an array of topics at the specified time
+ * @throws IllegalArgumentException if properties is {@code null} or if topics is {@code null} or empty or if any of
+ * the topics are {@code null}, empty or blank, or if there is an error parsing the
+ * properties.
+ * @throws IllegalStateException if there is an error communicating with the Kafka cluster to retrieve information.
+ */
+ public static Map<TopicPartition, Long> getBrokerOffsets(Properties properties, long time, String... topics) {
+ if (properties == null)
+ throw new IllegalArgumentException("properties cannot be null");
+
+ final List<Broker> brokers = getBrokers(properties);
+ Collections.shuffle(brokers, RANDOM);
+
+ return getBrokerOffsets(brokers, time, topics);
+ }
+
+ // Visible for testing
+ static Map<TopicPartition, Long> getBrokerOffsets(List<Broker> brokers, long time, String... topics) {
+ if (topics == null)
+ throw new IllegalArgumentException("topics cannot be null");
+ if (topics.length == 0)
+ throw new IllegalArgumentException("topics cannot be empty");
+
+ for (String topic : topics)
+ if (StringUtils.isBlank(topic))
+ throw new IllegalArgumentException("No topic can be null, empty or blank");
+
+ TopicMetadataResponse topicMetadataResponse = null;
+
+ final TopicMetadataRequest topicMetadataRequest = getTopicMetadataRequest(topics);
+
+ for (final Broker broker : brokers) {
+ final SimpleConsumer consumer = getSimpleConsumer(broker);
+ try {
+ topicMetadataResponse = consumer.send(topicMetadataRequest);
+ break;
+ } catch (Exception err) {
+ EndPoint endpoint = broker.endPoints().get(SecurityProtocol.PLAINTEXT).get();
+ LOG.warn(String.format("Fetching topic metadata for topic(s) '%s' from broker '%s' failed",
+ Arrays.toString(topics), endpoint.host()), err);
+ } finally {
+ consumer.close();
+ }
+ }
+
+ if (topicMetadataResponse == null) {
+ throw new IllegalStateException(String.format("Fetching topic metadata for topic(s) '%s' from broker(s) '%s' failed",
+ Arrays.toString(topics), Arrays.toString(brokers.toArray())));
+ }
+
+ // From the topic metadata, build a PartitionOffsetRequestInfo for each partition of each topic. It should be noted that
+ // only the leader Broker has the partition offset information[1] so save the leader Broker so we
+ // can send the request to it.
+ // [1] - https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html#AGuideToTheKafkaProtocol-OffsetAPI
+ Map<Broker, Map<TopicAndPartition, PartitionOffsetRequestInfo>> brokerRequests =
+ new HashMap<>();
+
+ for (TopicMetadata metadata : topicMetadataResponse.topicsMetadata()) {
+ for (PartitionMetadata partition : metadata.partitionsMetadata()) {
+ Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
+ new HashMap<>();
+
+ BrokerEndPoint brokerEndPoint = partition.leader();
+ Broker leader = new Broker(0, brokerEndPoint.host(), brokerEndPoint.port(), SecurityProtocol.PLAINTEXT);
+
+ if (brokerRequests.containsKey(leader))
+ requestInfo = brokerRequests.get(leader);
+
+ requestInfo.put(new TopicAndPartition(metadata.topic(), partition.partitionId()), new PartitionOffsetRequestInfo(
+ time, 1));
+
+ brokerRequests.put(leader, requestInfo);
+ }
+ }
+
+ Map<TopicPartition, Long> topicPartitionToOffset = new HashMap<>();
+
+ // Send the offset request to the leader broker
+ for (Map.Entry<Broker, Map<TopicAndPartition, PartitionOffsetRequestInfo>> brokerRequest : brokerRequests.entrySet()) {
+ SimpleConsumer simpleConsumer = getSimpleConsumer(brokerRequest.getKey());
+
+ OffsetResponse offsetResponse = null;
+ try {
+ OffsetRequest offsetRequest = new OffsetRequest(brokerRequest.getValue(), kafka.api.OffsetRequest.CurrentVersion(),
+ CLIENT_ID);
+ offsetResponse = simpleConsumer.getOffsetsBefore(offsetRequest);
+ } finally {
+ simpleConsumer.close();
+ }
+
+ Map<TopicPartition, Long> earliestOffsets = null;
+
+ // Retrieve/parse the results
+ for (Map.Entry<TopicAndPartition, PartitionOffsetRequestInfo> entry : brokerRequest.getValue().entrySet()) {
+ TopicAndPartition topicAndPartition = entry.getKey();
+ TopicPartition topicPartition = new TopicPartition(topicAndPartition.topic(), topicAndPartition.partition());
+ long[] offsets = offsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition());
+ long offset;
+
+ // The Kafka API will return no value if a time is given which there is no log that contains messages from that time
+ // (i.e. before a topic existed or in a log that was rolled/cleaned)
+ if (offsets.length > 0) {
+ offset = offsets[0];
+ } else {
+ LOG.info("Kafka did not have an offset for topic/partition [{}]. Returning earliest known offset instead",
+ topicAndPartition);
+
+ // This shouldn't happen but if kafka's API did not provide us with a value and we are asking for the earliest
+ // time we can't be sure what to do so quit
+ if (time == kafka.api.OffsetRequest.EarliestTime())
+ throw new IllegalStateException("We requested the earliest offsets for topic [" + topicAndPartition.topic()
+ + "] but Kafka returned no values");
+
+ // Load the earliest offsets for the topic if it hasn't been loaded already
+ if (earliestOffsets == null)
+ earliestOffsets = getBrokerOffsets(Arrays.asList(brokerRequest.getKey()),
+ kafka.api.OffsetRequest.EarliestTime(), topicAndPartition.topic());
+
+ offset = earliestOffsets.get(topicPartition);
+ }
+
+ topicPartitionToOffset.put(topicPartition, offset);
+ }
+ }
+
+ return topicPartitionToOffset;
+ }
+
+ /**
+ * Returns a {@link SimpleConsumer} connected to the given {@link Broker}
+ */
+ private static SimpleConsumer getSimpleConsumer(final Broker broker) {
+ // BrokerHost, BrokerPort, timeout, buffer size, client id
+ EndPoint endpoint = broker.endPoints().get(SecurityProtocol.PLAINTEXT).get();
+ return new SimpleConsumer(endpoint.host(), endpoint.port(), 100000, 64 * 1024, CLIENT_ID);
+ }
+
+ /**
+ * Returns a {@link Broker} list from the given {@link Properties}
+ *
+ * @param properties the {@link Properties} with configuration to connect to a Kafka broker
+ */
+ private static List<Broker> getBrokers(final Properties properties) {
+ if (properties == null)
+ throw new IllegalArgumentException("props cannot be null");
+
+ String commaDelimitedBrokerList = properties.getProperty("metadata.broker.list");
+ if (commaDelimitedBrokerList == null)
+ throw new IllegalArgumentException("Unable to find 'metadata.broker.list' in given properties");
+
+ // Split broker list into host/port pairs
+ String[] brokerPortList = commaDelimitedBrokerList.split(",");
+ if (brokerPortList.length < 1)
+ throw new IllegalArgumentException("Unable to parse broker list : [" + Arrays.toString(brokerPortList) + "]");
+
+ final List<Broker> brokers = new ArrayList<Broker>(brokerPortList.length);
+ for (final String brokerHostPortString : brokerPortList) {
+ // Split host/port
+ String[] brokerHostPort = brokerHostPortString.split(":");
+ if (brokerHostPort.length != 2)
+ throw new IllegalArgumentException("Unable to parse host/port from broker string : ["
+ + Arrays.toString(brokerHostPort) + "] from broker list : [" + Arrays.toString(brokerPortList) + "]");
+ try {
+ brokers.add(new Broker(0, brokerHostPort[0], Integer.parseInt(brokerHostPort[1]), SecurityProtocol.PLAINTEXT));
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Error parsing broker port : " + brokerHostPort[1], e);
+ }
+ }
+ return brokers;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java
new file mode 100644
index 0000000..eba4a97
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.inputformat;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.crunch.Pair;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Basic input format for reading data from Kafka. Data is read and maintained in its pure byte form and wrapped
+ * inside of a {@link BytesWritable} instance.
+ *
+ * Populating the configuration of the input format is handled with the convenience method of
+ * {@link #writeOffsetsToConfiguration(Map, Configuration)}. This should be done to ensure
+ * the Kafka offset information is available when the input format {@link #getSplits(JobContext) creates its splits}
+ * and {@link #createRecordReader(InputSplit, TaskAttemptContext) readers}.
+ */
+public class KafkaInputFormat extends InputFormat<BytesWritable, BytesWritable> implements Configurable {
+
+ /**
+ * Constant for constructing configuration keys for the input format.
+ */
+ private static final String KAFKA_INPUT_OFFSETS_BASE = "org.apache.crunch.kafka.offsets.topic";
+
+ /**
+ * Constant used for building configuration keys and specifying partitions.
+ */
+ private static final String PARTITIONS = "partitions";
+
+ /**
+ * Constant used for building configuration keys and specifying the start of a partition.
+ */
+ private static final String START = "start";
+
+ /**
+ * Constant used for building configuration keys and specifying the end of a partition.
+ */
+ private static final String END = "end";
+
+ /**
+ * Regex to discover all of the defined partitions which should be consumed by the input format.
+ */
+ private static final String TOPIC_KEY_REGEX = KAFKA_INPUT_OFFSETS_BASE + "\\..*\\." + PARTITIONS + "$";
+
+ private Configuration configuration;
+
+ @Override
+ public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
+ Map<TopicPartition, Pair<Long, Long>> offsets = getOffsets(getConf());
+ List<InputSplit> splits = new LinkedList<>();
+ for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) {
+ TopicPartition topicPartition = entry.getKey();
+
+ long start = entry.getValue().first();
+ long end = entry.getValue().second();
+ if(start != end) {
+ splits.add(new KafkaInputSplit(topicPartition.topic(), topicPartition.partition(), entry.getValue().first(),
+ entry.getValue().second()));
+ }
+ }
+
+ return splits;
+ }
+
+ @Override
+ public RecordReader<BytesWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ return new KafkaRecordReader<>();
+ }
+
+ @Override
+ public void setConf(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return configuration;
+ }
+
+
+ //The following methods are used for reading and writing Kafka Partition offset information into Hadoop's Configuration
+ //objects and into Crunch's FormatBundle. For a specific Kafka Topic it might have one or many partitions and for
+ //each partition it will need a start and end offset. Assuming you have a topic of "abc" and it has 2 partitions the
+ //configuration would be populated with the following:
+ // org.apache.crunch.kafka.offsets.topic.abc.partitions = [0,1]
+ // org.apache.crunch.kafka.offsets.topic.abc.partitions.0.start = <partition start>
+ // org.apache.crunch.kafka.offsets.topic.abc.partitions.0.end = <partition end>
+ // org.apache.crunch.kafka.offsets.topic.abc.partitions.1.start = <partition start>
+ // org.apache.crunch.kafka.offsets.topic.abc.partitions.1.end = <partition end>
+
+ /**
+ * Writes the start and end offsets for the provided topic partitions to the {@code bundle}.
+ * @param offsets The starting and ending offsets for the topics and partitions.
+ * @param bundle the bundle into which the information should be persisted.
+ */
+ public static void writeOffsetsToBundle(Map<TopicPartition, Pair<Long, Long>> offsets, FormatBundle bundle) {
+ for (Map.Entry<String, String> entry : generateValues(offsets).entrySet()) {
+ bundle.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ /**
+ * Writes the start and end offsets for the provided topic partitions to the {@code config}.
+ * @param offsets The starting and ending offsets for the topics and partitions.
+ * @param config the config into which the information should be persisted.
+ */
+ public static void writeOffsetsToConfiguration(Map<TopicPartition, Pair<Long, Long>> offsets, Configuration config) {
+ for (Map.Entry<String, String> entry : generateValues(offsets).entrySet()) {
+ config.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ /**
+ * Reads the {@code configuration} to determine which topics, partitions, and offsets should be used for reading data.
+ *
+ * @param configuration the configuration to derive the data to read.
+ * @return a map of {@link TopicPartition} to a pair of start and end offsets.
+ * @throws IllegalStateException if the {@code configuration} does not have the start and end offsets set properly
+ * for a partition.
+ */
+ public static Map<TopicPartition, Pair<Long, Long>> getOffsets(Configuration configuration) {
+ Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+ //find configuration for all of the topics with defined partitions
+ Map<String, String> topicPartitionKeys = configuration.getValByRegex(TOPIC_KEY_REGEX);
+
+ //for each topic start to process it's partitions
+ for (String key : topicPartitionKeys.keySet()) {
+ String topic = getTopicFromKey(key);
+ int[] partitions = configuration.getInts(key);
+ //for each partition find and add the start/end offset
+ for (int partitionId : partitions) {
+ TopicPartition topicPartition = new TopicPartition(topic, partitionId);
+ long start = configuration.getLong(generatePartitionStartKey(topic, partitionId),Long.MIN_VALUE);
+ long end = configuration.getLong(generatePartitionEndKey(topic, partitionId),
+ Long.MIN_VALUE);
+
+ if(start == Long.MIN_VALUE || end == Long.MIN_VALUE){
+ throw new IllegalStateException("The "+topicPartition+ "has an invalid start:"+start+ " or end:"+end
+ +" offset configured.");
+ }
+
+ offsets.put(topicPartition, Pair.of(start, end));
+ }
+ }
+
+ return offsets;
+ }
+
+ private static Map<String, String> generateValues(Map<TopicPartition, Pair<Long, Long>> offsets) {
+ Map<String, String> offsetConfigValues = new HashMap<>();
+ Map<String, Set<Integer>> topicsPartitions = new HashMap<>();
+
+ for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) {
+ TopicPartition topicPartition = entry.getKey();
+ String topic = topicPartition.topic();
+ int partition = topicPartition.partition();
+ String startKey = generatePartitionStartKey(topic, partition);
+ String endKey = generatePartitionEndKey(topic, partition);
+ //Add the start and end offsets for a specific partition
+ offsetConfigValues.put(startKey, Long.toString(entry.getValue().first()));
+ offsetConfigValues.put(endKey, Long.toString(entry.getValue().second()));
+
+ Set<Integer> partitions = topicsPartitions.get(topic);
+ if (partitions == null) {
+ partitions = new HashSet<>();
+ topicsPartitions.put(topic, partitions);
+ }
+ partitions.add(partition);
+ }
+
+ //generate the partitions values for each topic
+ for (Map.Entry<String, Set<Integer>> entry : topicsPartitions.entrySet()) {
+ String key = KAFKA_INPUT_OFFSETS_BASE + "." + entry.getKey() + "." + PARTITIONS;
+ Set<Integer> partitions = entry.getValue();
+ String partitionsString = StringUtils.join(partitions, ",");
+ offsetConfigValues.put(key, partitionsString);
+ }
+
+ return offsetConfigValues;
+ }
+
+ static String generatePartitionStartKey(String topic, int partition) {
+ return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS + "." + partition + "." + START;
+ }
+
+ static String generatePartitionEndKey(String topic, int partition) {
+ return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS + "." + partition + "." + END;
+ }
+
+ static String generateTopicPartitionsKey(String topic) {
+ return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS;
+ }
+
+ static String getTopicFromKey(String key) {
+ //strip off the base key + a trailing "."
+ String value = key.substring(KAFKA_INPUT_OFFSETS_BASE.length() + 1);
+ //strip off the end part + a preceding "."
+ value = value.substring(0, (value.length() - (PARTITIONS.length() + 1)));
+
+ return value;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputSplit.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputSplit.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputSplit.java
new file mode 100644
index 0000000..c8ebc6a
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputSplit.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.inputformat;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * InputSplit that represent retrieving data from a single {@link TopicPartition} between the specified start
+ * and end offsets.
+ */
+public class KafkaInputSplit extends InputSplit implements Writable {
+
+ private long startingOffset;
+ private long endingOffset;
+ private TopicPartition topicPartition;
+
+ /**
+ * Nullary Constructor for creating the instance inside the Mapper instance.
+ */
+ public KafkaInputSplit() {
+
+ }
+
+ /**
+ * Constructs an input split for the provided {@code topic} and {@code partition} restricting data to be between
+ * the {@code startingOffset} and {@code endingOffset}
+ * @param topic the topic for the split
+ * @param partition the partition for the topic
+ * @param startingOffset the start of the split
+ * @param endingOffset the end of the split
+ */
+ public KafkaInputSplit(String topic, int partition, long startingOffset, long endingOffset) {
+ this.startingOffset = startingOffset;
+ this.endingOffset = endingOffset;
+ topicPartition = new TopicPartition(topic, partition);
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ // This is just used as a hint for size of bytes so it is already inaccurate.
+ return startingOffset > 0 ? endingOffset - startingOffset : endingOffset;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ //Leave empty since data locality not really an issue.
+ return new String[0];
+ }
+
+ /**
+ * Returns the topic and partition for the split
+ * @return the topic and partition for the split
+ */
+ public TopicPartition getTopicPartition() {
+ return topicPartition;
+ }
+
+ /**
+ * Returns the starting offset for the split
+ * @return the starting offset for the split
+ */
+ public long getStartingOffset() {
+ return startingOffset;
+ }
+
+ /**
+ * Returns the ending offset for the split
+ * @return the ending offset for the split
+ */
+ public long getEndingOffset() {
+ return endingOffset;
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ dataOutput.writeUTF(topicPartition.topic());
+ dataOutput.writeInt(topicPartition.partition());
+ dataOutput.writeLong(startingOffset);
+ dataOutput.writeLong(endingOffset);
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ String topic = dataInput.readUTF();
+ int partition = dataInput.readInt();
+ startingOffset = dataInput.readLong();
+ endingOffset = dataInput.readLong();
+
+ topicPartition = new TopicPartition(topic, partition);
+ }
+
+ @Override
+ public String toString() {
+ return getTopicPartition() + " Start: " + startingOffset + " End: " + endingOffset;
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
new file mode 100644
index 0000000..1420519
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.inputformat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT;
+import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY;
+import static org.apache.crunch.kafka.KafkaUtils.getKafkaConnectionProperties;
+import static org.apache.crunch.kafka.KafkaSource.CONSUMER_POLL_TIMEOUT_DEFAULT;
+import static org.apache.crunch.kafka.KafkaSource.CONSUMER_POLL_TIMEOUT_KEY;
+
+/**
+ * A {@link RecordReader} for pulling data from Kafka.
+ * @param <K> the key of the records from Kafka
+ * @param <V> the value of the records from Kafka
+ */
+public class KafkaRecordReader<K, V> extends RecordReader<K, V> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordReader.class);
+
+ private Consumer<K, V> consumer;
+ private ConsumerRecord<K, V> record;
+ private long endingOffset;
+ private Iterator<ConsumerRecord<K, V>> recordIterator;
+ private long consumerPollTimeout;
+ private long maxNumberOfRecords;
+ private long startingOffset;
+ private int maxNumberAttempts;
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ consumer = new KafkaConsumer<>(getKafkaConnectionProperties(taskAttemptContext.getConfiguration()));
+ KafkaInputSplit split = (KafkaInputSplit) inputSplit;
+ TopicPartition topicPartition = split.getTopicPartition();
+ consumer.assign(Collections.singletonList(topicPartition));
+ //suggested hack to gather info without gathering data
+ consumer.poll(0);
+ //now seek to the desired start location
+ startingOffset = split.getStartingOffset();
+ consumer.seek(topicPartition,startingOffset);
+
+ endingOffset = split.getEndingOffset();
+
+ maxNumberOfRecords = endingOffset - split.getStartingOffset();
+ if(LOG.isInfoEnabled()) {
+ LOG.info("Reading data from {} between {} and {}", new Object[]{topicPartition, startingOffset, endingOffset});
+ }
+
+ Configuration config = taskAttemptContext.getConfiguration();
+ consumerPollTimeout = config.getLong(CONSUMER_POLL_TIMEOUT_KEY, CONSUMER_POLL_TIMEOUT_DEFAULT);
+ maxNumberAttempts = config.getInt(KAFKA_RETRY_ATTEMPTS_KEY, KAFKA_RETRY_ATTEMPTS_DEFAULT);
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ recordIterator = getRecords();
+ record = recordIterator.hasNext() ? recordIterator.next() : null;
+ if(LOG.isDebugEnabled()){
+ if(record != null) {
+ LOG.debug("nextKeyValue: Retrieved record with offset {}", record.offset());
+ }else{
+ LOG.debug("nextKeyValue: Retrieved null record");
+ }
+ }
+ return record != null && record.offset() < endingOffset;
+ }
+
+ @Override
+ public K getCurrentKey() throws IOException, InterruptedException {
+ return record == null ? null : record.key();
+ }
+
+ @Override
+ public V getCurrentValue() throws IOException, InterruptedException {
+ return record == null ? null : record.value();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ //not most accurate but gives reasonable estimate
+ return record == null ? 0.0f : ((float) (record.offset()- startingOffset)) / maxNumberOfRecords;
+ }
+
+ private Iterator<ConsumerRecord<K, V>> getRecords() {
+ if (recordIterator == null || !recordIterator.hasNext()) {
+ ConsumerRecords<K, V> records = null;
+ int numTries = 0;
+ boolean notSuccess = false;
+ while(!notSuccess && numTries < maxNumberAttempts) {
+ try {
+ records = consumer.poll(consumerPollTimeout);
+ notSuccess = true;
+ } catch (RetriableException re) {
+ numTries++;
+ if (numTries < maxNumberAttempts) {
+ LOG.warn("Error pulling messages from Kafka. Retrying with attempt {}", numTries, re);
+ } else {
+ LOG.error("Error pulling messages from Kafka. Exceeded maximum number of attempts {}", maxNumberAttempts, re);
+ throw re;
+ }
+ }
+ }
+
+ if(LOG.isDebugEnabled() && records != null){
+ LOG.debug("No records retrieved from Kafka therefore nothing to iterate over.");
+ }else{
+ LOG.debug("Retrieved records from Kafka to iterate over.");
+ }
+ return records != null ? records.iterator() : ConsumerRecords.<K, V>empty().iterator();
+ }
+ return recordIterator;
+ }
+
+ @Override
+ public void close() throws IOException {
+ LOG.debug("Closing the record reader.");
+ if(consumer != null) {
+ consumer.close();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
new file mode 100644
index 0000000..836039c
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import kafka.serializer.Decoder;
+import kafka.serializer.Encoder;
+import kafka.utils.VerifiableProperties;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.crunch.kafka.inputformat.KafkaInputFormatIT;
+import org.apache.crunch.kafka.inputformat.KafkaRecordReaderIT;
+import org.apache.crunch.kafka.utils.KafkaBrokerTestHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ //org.apache.crunch.kafka
+ KafkaSourceIT.class, KafkaRecordsIterableIT.class, KafkaDataIT.class,
+ //org.apache.crunch.kafka.inputformat
+ KafkaRecordReaderIT.class, KafkaInputFormatIT.class, KafkaUtilsIT.class,
+})
+public class ClusterTest {
+
+
+ private static TemporaryFolder folder = new TemporaryFolder();
+ private static KafkaBrokerTestHarness kafka;
+ private static boolean runAsSuite = false;
+ private static Configuration conf;
+ private static FileSystem fs;
+
+ @BeforeClass
+ public static void startSuite() throws Exception {
+ runAsSuite = true;
+ startKafka();
+ setupFileSystem();
+ }
+
+ @AfterClass
+ public static void endSuite() throws Exception {
+ stopKafka();
+ }
+
+ public static void startTest() throws Exception {
+ if (!runAsSuite) {
+ startKafka();
+ setupFileSystem();
+ }
+ }
+
+ public static void endTest() throws Exception {
+ if (!runAsSuite) {
+ stopKafka();
+ }
+ }
+
+ private static void stopKafka() throws IOException {
+ kafka.tearDown();
+ }
+
+ private static void startKafka() throws IOException {
+ Properties props = new Properties();
+ props.setProperty("auto.create.topics.enable", Boolean.TRUE.toString());
+
+ kafka = new KafkaBrokerTestHarness(props);
+ kafka.setUp();
+ }
+
+ private static void setupFileSystem() throws IOException {
+ folder.create();
+
+ conf = new Configuration();
+ conf.set(RuntimeParameters.TMP_DIR, folder.getRoot().getAbsolutePath());
+ // Run Map/Reduce tests in process.
+ conf.set("mapreduce.jobtracker.address", "local");
+ }
+
+ public static Configuration getConf() {
+ // Clone the configuration so it doesn't get modified for other tests.
+ return new Configuration(conf);
+ }
+
+ public static Properties getConsumerProperties() {
+ Properties props = new Properties();
+ props.putAll(kafka.getProps());
+ props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerDe.class.getName());
+ props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerDe.class.getName());
+ //set this because still needed by some APIs.
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty("metadata.broker.list"));
+ props.setProperty("enable.auto.commit", Boolean.toString(false));
+
+ //when set this causes some problems with initializing the consumer.
+ props.remove(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+ return props;
+ }
+
+ public static Properties getProducerProperties() {
+ Properties props = new Properties();
+ props.putAll(kafka.getProps());
+ //set this because still needed by some APIs.
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty("metadata.broker.list"));
+ return props;
+ }
+
+ public static Configuration getConsumerConfig() {
+ Configuration kafkaConfig = new Configuration(conf);
+ KafkaUtils.addKafkaConnectionProperties(getConsumerProperties(), kafkaConfig);
+ return kafkaConfig;
+ }
+
+ public static List<String> writeData(Properties props, String topic, String batch, int loops, int numValuesPerLoop) {
+ Properties producerProps = new Properties();
+ producerProps.putAll(props);
+ producerProps.setProperty("serializer.class", StringEncoderDecoder.class.getName());
+ producerProps.setProperty("key.serializer.class", StringEncoderDecoder.class.getName());
+
+ // Set the default compression used to be snappy
+ producerProps.setProperty("compression.codec", "snappy");
+ producerProps.setProperty("request.required.acks", "1");
+
+ ProducerConfig producerConfig = new ProducerConfig(producerProps);
+
+ Producer<String, String> producer = new Producer<>(producerConfig);
+ List<String> keys = new LinkedList<>();
+ try {
+ for (int i = 0; i < loops; i++) {
+ List<KeyedMessage<String, String>> events = new LinkedList<>();
+ for (int j = 0; j < numValuesPerLoop; j++) {
+ String key = "key" + batch + i + j;
+ String value = "value" + batch + i + j;
+ keys.add(key);
+ events.add(new KeyedMessage<>(topic, key, value));
+ }
+ producer.send(events);
+ }
+ } finally {
+ producer.close();
+ }
+ return keys;
+ }
+
+
+ public static class StringSerDe implements Serializer<String>, Deserializer<String> {
+
+ @Override
+ public void configure(Map map, boolean b) {
+
+ }
+
+ @Override
+ public byte[] serialize(String topic, String value) {
+ return value.getBytes();
+ }
+
+ @Override
+ public String deserialize(String topic, byte[] bytes) {
+ return new String(bytes);
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
+
+ public static class StringEncoderDecoder implements Encoder<String>, Decoder<String> {
+
+ public StringEncoderDecoder() {
+
+ }
+
+ public StringEncoderDecoder(VerifiableProperties props) {
+
+ }
+
+ @Override
+ public String fromBytes(byte[] bytes) {
+ return new String(bytes);
+ }
+
+ @Override
+ public byte[] toBytes(String value) {
+ return value.getBytes();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaDataIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaDataIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaDataIT.java
new file mode 100644
index 0000000..595a94b
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaDataIT.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+
+import kafka.api.OffsetRequest;
+import org.apache.crunch.Pair;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.crunch.kafka.ClusterTest.writeData;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNot.not;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public class KafkaDataIT {
+
+ @Rule
+ public TestName testName = new TestName();
+
+ private String topic;
+ private Map<TopicPartition, Long> startOffsets;
+ private Map<TopicPartition, Long> stopOffsets;
+ private Map<TopicPartition, Pair<Long, Long>> offsets;
+ private Properties props;
+
+ @BeforeClass
+ public static void init() throws Exception {
+ ClusterTest.startTest();
+ }
+
+ @AfterClass
+ public static void cleanup() throws Exception {
+ ClusterTest.endTest();
+ }
+
+ @Before
+ public void setup() {
+ topic = testName.getMethodName();
+
+ props = ClusterTest.getConsumerProperties();
+
+ startOffsets = new HashMap<>();
+ stopOffsets = new HashMap<>();
+ offsets = new HashMap<>();
+ for (int i = 0; i < 4; i++) {
+ TopicPartition tp = new TopicPartition(topic, i);
+ startOffsets.put(tp, 0L);
+ stopOffsets.put(tp, 100L);
+
+ offsets.put(tp, Pair.of(0L, 100L));
+ }
+ }
+
+ @Test
+ public void getDataIterable() throws IOException {
+ int loops = 10;
+ int numPerLoop = 100;
+ int total = loops * numPerLoop;
+ List<String> keys = writeData(props, topic, "batch", loops, numPerLoop);
+
+ startOffsets = getStartOffsets(props, topic);
+ stopOffsets = getStopOffsets(props, topic);
+
+ Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+ for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+ offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
+ }
+
+ Iterable<Pair<String, String>> data = new KafkaData<String, String>(props, offsets).read(null);
+
+ int count = 0;
+ for (Pair<String, String> event : data) {
+ assertThat(keys, hasItem(event.first()));
+ assertTrue(keys.remove(event.first()));
+ count++;
+ }
+
+ assertThat(count, is(total));
+ assertThat(keys.size(), is(0));
+ }
+
+ private static Map<TopicPartition, Long> getStopOffsets(Properties props, String topic) {
+ return KafkaUtils.getBrokerOffsets(props, OffsetRequest.LatestTime(), topic);
+ }
+
+ private static Map<TopicPartition, Long> getStartOffsets(Properties props, String topic) {
+ return KafkaUtils.getBrokerOffsets(props, OffsetRequest.EarliestTime(), topic);
+ }
+}
[3/3] crunch git commit: CRUNCH-606: Handle setting version correctly
and removed stray System.out in test.
Posted by mk...@apache.org.
CRUNCH-606: Handle setting version correctly and removed stray System.out in test.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/360d72a4
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/360d72a4
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/360d72a4
Branch: refs/heads/master
Commit: 360d72a4f887505e020fdb8f99c3ccb1800693f6
Parents: 321cfef
Author: Micah Whitacre <mk...@gmail.com>
Authored: Mon May 23 15:13:02 2016 -0500
Committer: Micah Whitacre <mk...@gmail.com>
Committed: Mon May 23 15:13:55 2016 -0500
----------------------------------------------------------------------
crunch-kafka/pom.xml | 2 +-
.../test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java | 1 -
2 files changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/360d72a4/crunch-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-kafka/pom.xml b/crunch-kafka/pom.xml
index a96a9b0..7d6256b 100644
--- a/crunch-kafka/pom.xml
+++ b/crunch-kafka/pom.xml
@@ -22,7 +22,7 @@ under the License.
<parent>
<groupId>org.apache.crunch</groupId>
<artifactId>crunch-parent</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.14.1-SNAPSHOT</version>
</parent>
<artifactId>crunch-kafka</artifactId>
http://git-wip-us.apache.org/repos/asf/crunch/blob/360d72a4/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
index ce97ec1..36f293f 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
@@ -177,7 +177,6 @@ public class KafkaRecordsIterableIT {
Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
- System.out.println(entry.getKey()+ "start:"+entry.getValue()+":end:"+stopOffsets.get(entry.getKey()));
}
Iterable<Pair<String, String>> data = new KafkaRecordsIterable<String, String>(consumer, offsets, new Properties());