You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by mk...@apache.org on 2016/05/23 20:21:48 UTC

[1/3] crunch git commit: CRUNCH-606: Kafka Source for Crunch which supports reading data as BytesWritable

Repository: crunch
Updated Branches:
  refs/heads/master c09c4ee2d -> 360d72a4f


http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
new file mode 100644
index 0000000..ce97ec1
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+
+import kafka.api.OffsetRequest;
+import org.apache.crunch.Pair;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.crunch.kafka.ClusterTest.writeData;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNot.not;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaRecordsIterableIT {
+
+  @Mock
+  private Consumer<String, String> mockedConsumer;
+
+  @Mock
+  private ConsumerRecords<String, String> records;
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private String topic;
+  private Map<TopicPartition, Long> startOffsets;
+  private Map<TopicPartition, Long> stopOffsets;
+  private Map<TopicPartition, Pair<Long, Long>> offsets;
+  private Consumer<String, String> consumer;
+  private Properties props;
+  private Properties consumerProps;
+
+  @BeforeClass
+  public static void init() throws Exception {
+    ClusterTest.startTest();
+  }
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    ClusterTest.endTest();
+  }
+
+  @Before
+  public void setup() {
+    topic = testName.getMethodName();
+
+    props = ClusterTest.getConsumerProperties();
+
+    startOffsets = new HashMap<>();
+    stopOffsets = new HashMap<>();
+    offsets = new HashMap<>();
+    for (int i = 0; i < 4; i++) {
+      TopicPartition tp = new TopicPartition(topic, i);
+      startOffsets.put(tp, 0L);
+      stopOffsets.put(tp, 100L);
+
+      offsets.put(tp, Pair.of(0L, 100L));
+    }
+
+
+    consumerProps = new Properties();
+    consumerProps.putAll(props);
+  }
+
+  @After
+  public void shutdown() {
+  }
+
+
+  @Test(expected = IllegalArgumentException.class)
+  public void nullConsumer() {
+    new KafkaRecordsIterable(null, offsets, new Properties());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void nullOffsets() {
+    new KafkaRecordsIterable<>(consumer, null, new Properties());
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void emptyOffsets() {
+    consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
+    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<String, String>(consumer,
+        Collections.<TopicPartition, Pair<Long, Long>>emptyMap(), new Properties());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void nullProperties() {
+    new KafkaRecordsIterable(consumer, offsets, null);
+  }
+
+  @Test
+  public void iterateOverValues() {
+    consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
+    int loops = 10;
+    int numPerLoop = 100;
+    int total = loops * numPerLoop;
+    List<String> keys = writeData(props, topic, "batch", loops, numPerLoop);
+
+    startOffsets = getStartOffsets(props, topic);
+    stopOffsets = getStopOffsets(props, topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
+    }
+
+
+    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<String, String>(consumer, offsets, new Properties());
+
+    int count = 0;
+    for (Pair<String, String> event : data) {
+      assertThat(keys, hasItem(event.first()));
+      assertTrue(keys.remove(event.first()));
+      count++;
+    }
+
+    assertThat(count, is(total));
+    assertThat(keys.size(), is(0));
+  }
+
+  @Test
+  public void iterateOverOneValue() {
+    consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
+    int loops = 1;
+    int numPerLoop = 1;
+    int total = loops * numPerLoop;
+    List<String> keys = writeData(props, topic, "batch", loops, numPerLoop);
+
+    startOffsets = getStartOffsets(props, topic);
+    stopOffsets = getStopOffsets(props, topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
+      System.out.println(entry.getKey()+ "start:"+entry.getValue()+":end:"+stopOffsets.get(entry.getKey()));
+    }
+
+    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<String, String>(consumer, offsets, new Properties());
+
+    int count = 0;
+    for (Pair<String, String> event : data) {
+      assertThat(keys, hasItem(event.first()));
+      assertTrue(keys.remove(event.first()));
+      count++;
+    }
+
+    assertThat(count, is(total));
+    assertThat(keys.size(), is(0));
+  }
+
+  @Test
+  public void iterateOverNothing() {
+    consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
+    int loops = 10;
+    int numPerLoop = 100;
+    writeData(props, topic, "batch", loops, numPerLoop);
+
+    //set the start offsets equal to the stop so won't iterate over anything
+    startOffsets = getStartOffsets(props, topic);
+    stopOffsets = getStartOffsets(props, topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
+    }
+
+    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(consumer, offsets, new Properties());
+
+    int count = 0;
+    for (Pair<String, String> event : data) {
+      count++;
+    }
+
+    assertThat(count, is(0));
+  }
+
+  @Test
+  public void iterateOverPartial() {
+    consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
+    int loops = 10;
+    int numPerLoop = 100;
+    int numPerPartition = 50;
+
+    writeData(props, topic, "batch", loops, numPerLoop);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    //set the start offsets equal to the stop so won't iterate over anything
+    startOffsets = getStartOffsets(props, topic);
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), entry.getValue() + numPerPartition));
+    }
+
+    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(consumer, offsets, new Properties());
+
+    int count = 0;
+    for (Pair<String, String> event : data) {
+      count++;
+    }
+
+    assertThat(count, is(startOffsets.size() * numPerPartition));
+  }
+
+  @Test
+  public void dontIteratePastStop() {
+    consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
+    int loops = 10;
+    int numPerLoop = 100;
+
+    List<String> keys = writeData(props, topic, "batch1", loops, numPerLoop);
+
+    //set the start offsets equal to the stop so won't iterate over anything
+    startOffsets = getStartOffsets(props, topic);
+    stopOffsets = getStopOffsets(props, topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
+    }
+
+    List<String> secondKeys = writeData(props, topic, "batch2", loops, numPerLoop);
+
+    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(consumer, offsets, new Properties());
+
+    int count = 0;
+    for (Pair<String, String> event : data) {
+      assertThat(keys, hasItem(event.first()));
+      assertTrue(keys.remove(event.first()));
+      assertThat(secondKeys, not(hasItem(event.first())));
+      count++;
+    }
+
+    assertThat(count, is(loops * numPerLoop));
+    assertThat(keys.size(), is(0));
+  }
+
+  @Test
+  public void iterateSkipInitialValues() {
+    consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
+    int loops = 10;
+    int numPerLoop = 100;
+
+    List<String> keys = writeData(props, topic, "batch1", loops, numPerLoop);
+
+    //set the start offsets equal to the stop so won't iterate over anything
+    startOffsets = getStopOffsets(props, topic);
+
+    List<String> secondKeys = writeData(props, topic, "batch2", loops, numPerLoop);
+
+    stopOffsets = getStopOffsets(props, topic);
+
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
+    }
+
+
+    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<String, String>(consumer, offsets,
+        new Properties());
+
+    int count = 0;
+    for (Pair<String, String> event : data) {
+      assertThat(secondKeys, hasItem(event.first()));
+      assertTrue(secondKeys.remove(event.first()));
+      assertThat(keys, not(hasItem(event.first())));
+      count++;
+    }
+
+    assertThat(count, is(loops * numPerLoop));
+    assertThat(secondKeys.size(), is(0));
+  }
+
+  @Test
+  public void iterateValuesWithExceptions() {
+    List<ConsumerRecord<String, String>> returnedRecords = new LinkedList<>();
+
+    for(int i = 0; i < 25; i++){
+      returnedRecords.add(new ConsumerRecord<String, String>(topic, 0, i, "key", null));
+      returnedRecords.add(new ConsumerRecord<String, String>(topic, 1, i, "key", null));
+      returnedRecords.add(new ConsumerRecord<String, String>(topic, 2, i, "key", null));
+      returnedRecords.add(new ConsumerRecord<String, String>(topic, 3, i, "key", null));
+    }
+
+    offsets = new HashMap<>();
+    offsets.put(new TopicPartition(topic, 0), Pair.of(0L, 25L));
+    offsets.put(new TopicPartition(topic, 1), Pair.of(0L, 25L));
+    offsets.put(new TopicPartition(topic, 2), Pair.of(0L, 25L));
+    offsets.put(new TopicPartition(topic, 3), Pair.of(0L, 25L));
+
+    when(records.isEmpty()).thenReturn(false);
+    when(records.iterator()).thenReturn(returnedRecords.iterator());
+    when(mockedConsumer.poll(Matchers.anyLong()))
+        //request for the first poll
+        .thenReturn(null)
+        //fail twice
+        .thenThrow(new TimeoutException("fail1"))
+        .thenThrow(new TimeoutException("fail2"))
+        //request that will give data
+        .thenReturn(records)
+        // shows to stop retrieving data
+        .thenReturn(null);
+
+    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(mockedConsumer, offsets, new Properties());
+
+    int count = 0;
+    for (Pair<String, String> event : data) {
+      count++;
+    }
+
+    //should have gotten one value per topicpartition
+    assertThat(count, is(returnedRecords.size()));
+  }
+
+  @Test
+  public void iterateValuesAfterStopOffsets() {
+    List<ConsumerRecord<String, String>> returnedRecords = new LinkedList<>();
+    for (Map.Entry<TopicPartition, Long> entry : stopOffsets.entrySet()) {
+      returnedRecords.add(new ConsumerRecord<String, String>(entry.getKey().topic(),
+          entry.getKey().partition(), entry.getValue() + 1, "key", null));
+    }
+
+    when(records.isEmpty()).thenReturn(false);
+    when(records.iterator()).thenReturn(returnedRecords.iterator());
+    when(mockedConsumer.poll(Matchers.anyLong())).thenReturn(records).thenReturn(records).thenReturn(null);
+
+    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(mockedConsumer, offsets, new Properties());
+
+    int count = 0;
+    for (Pair<String, String> event : data) {
+      count++;
+    }
+
+    assertThat(count, is(0));
+
+  }
+
+  @Test(expected = RetriableException.class)
+  public void iterateRetriableExceptionMaxExceeded() {
+    List<ConsumerRecord<String, String>> returnedRecords = new LinkedList<>();
+    for (Map.Entry<TopicPartition, Long> entry : stopOffsets.entrySet()) {
+      returnedRecords.add(new ConsumerRecord<String, String>(entry.getKey().topic(),
+          entry.getKey().partition(), entry.getValue() + 1, "key", null));
+    }
+
+    when(records.isEmpty()).thenReturn(false);
+    when(records.iterator()).thenReturn(returnedRecords.iterator());
+    when(mockedConsumer.poll(Matchers.anyLong()))
+        //for the fill poll call
+        .thenReturn(null)
+        //retry 5 times then fail
+        .thenThrow(new TimeoutException("fail1"))
+        .thenThrow(new TimeoutException("fail2"))
+        .thenThrow(new TimeoutException("fail3"))
+        .thenThrow(new TimeoutException("fail4"))
+        .thenThrow(new TimeoutException("fail5"))
+        .thenThrow(new TimeoutException("fail6"));
+
+    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(mockedConsumer, offsets, new Properties());
+
+    data.iterator().next();
+  }
+
+  private static Map<TopicPartition, Long> getStopOffsets(Properties props, String topic) {
+    return KafkaUtils.getBrokerOffsets(props, OffsetRequest.LatestTime(), topic);
+  }
+
+  private static Map<TopicPartition, Long> getStartOffsets(Properties props, String topic) {
+    return KafkaUtils.getBrokerOffsets(props, OffsetRequest.EarliestTime(), topic);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java
new file mode 100644
index 0000000..3800c24
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+import kafka.api.OffsetRequest;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.crunch.kafka.KafkaUtils.getBrokerOffsets;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.matchers.JUnitMatchers.hasItem;
+
+public class KafkaSourceIT {
+
+  @Rule
+  public TemporaryPath path = new TemporaryPath();
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private Properties consumerProps;
+  private Configuration config;
+  private String topic;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterTest.startTest();
+  }
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    ClusterTest.endTest();
+  }
+
+  @Before
+  public void setupTest() {
+    topic = testName.getMethodName();
+    consumerProps = ClusterTest.getConsumerProperties();
+    config = ClusterTest.getConsumerConfig();
+  }
+
+  @Test
+  public void sourceReadData() {
+    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
+    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
+    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      Long endingOffset = endOffsets.get(entry.getKey());
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
+    }
+
+    Configuration config = ClusterTest.getConf();
+
+    Pipeline pipeline = new MRPipeline(KafkaSourceIT.class, config);
+    pipeline.enableDebug();
+
+    TableSource<BytesWritable, BytesWritable> kafkaSource = new KafkaSource(consumerProps, offsets);
+
+    PTable<BytesWritable, BytesWritable> read = pipeline.read(kafkaSource);
+
+    Set<String> keysRead = new HashSet<>();
+    int numRecordsFound = 0;
+    for (Pair<BytesWritable, BytesWritable> values : read.materialize()) {
+      assertThat(keys, hasItem(new String(values.first().getBytes())));
+      numRecordsFound++;
+      keysRead.add(new String(values.first().getBytes()));
+    }
+
+    assertThat(numRecordsFound, is(keys.size()));
+    assertThat(keysRead.size(), is(keys.size()));
+
+    pipeline.done();
+  }
+
+
+  @Test
+  public void sourceReadDataThroughPipeline() {
+    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
+    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
+    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      Long endingOffset = endOffsets.get(entry.getKey());
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
+    }
+
+    Configuration config = ClusterTest.getConf();
+
+    Pipeline pipeline = new MRPipeline(KafkaSourceIT.class, config);
+    pipeline.enableDebug();
+
+    TableSource<BytesWritable, BytesWritable> kafkaSource = new KafkaSource(consumerProps, offsets);
+
+    PTable<BytesWritable, BytesWritable> read = pipeline.read(kafkaSource);
+    Path out = path.getPath("out");
+    read.parallelDo(new SimpleConvertFn(), Avros.strings()).write(To.textFile(out));
+
+    pipeline.run();
+
+    PCollection<String> persistedKeys = pipeline.read(From.textFile(out));
+
+    Set<String> keysRead = new HashSet<>();
+    int numRecordsFound = 0;
+    for (String value : persistedKeys.materialize()) {
+      assertThat(keys, hasItem(value));
+      numRecordsFound++;
+      keysRead.add(value);
+    }
+
+    assertThat(numRecordsFound, is(keys.size()));
+    assertThat(keysRead.size(), is(keys.size()));
+
+    pipeline.done();
+  }
+
+
+  private static class SimpleConvertFn extends MapFn<Pair<BytesWritable, BytesWritable>, String> {
+    @Override
+    public String map(Pair<BytesWritable, BytesWritable> input) {
+      return new String(input.first().getBytes());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java
new file mode 100644
index 0000000..38c3fce
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+import kafka.cluster.Broker;
+import org.apache.crunch.kafka.ClusterTest;
+import org.apache.crunch.kafka.KafkaUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+
+public class KafkaUtilsIT {
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private String topic;
+  private static Broker broker;
+
+  @BeforeClass
+  public static void startup() throws Exception {
+    ClusterTest.startTest();
+
+    Properties props = ClusterTest.getConsumerProperties();
+    String brokerHostPorts = props.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
+
+    String brokerHostPortString = brokerHostPorts.split(",")[0];
+    String[] brokerHostPort = brokerHostPortString.split(":");
+
+    String brokerHost = brokerHostPort[0];
+    int brokerPort = Integer.parseInt(brokerHostPort[1]);
+
+    broker = new Broker(0, brokerHost, brokerPort, SecurityProtocol.PLAINTEXT);
+  }
+
+  @AfterClass
+  public static void shutdown() throws Exception {
+    ClusterTest.endTest();
+  }
+
+  @Before
+  public void setup() throws IOException {
+    topic = "topic-" + testName.getMethodName();
+  }
+
+  @Test
+  public void getKafkaProperties() {
+    Configuration config = new Configuration(false);
+    String propertyKey = "fake.kafka.property";
+    String propertyValue = testName.getMethodName();
+    config.set(propertyKey, propertyValue);
+
+    Properties props = KafkaUtils.getKafkaConnectionProperties(config);
+    assertThat(props.get(propertyKey), is((Object) propertyValue));
+  }
+
+  @Test
+  public void addKafkaProperties() {
+    String propertyKey = "fake.kafka.property";
+    String propertyValue = testName.getMethodName();
+
+    Properties props = new Properties();
+    props.setProperty(propertyKey, propertyValue);
+
+    Configuration config = new Configuration(false);
+
+    KafkaUtils.addKafkaConnectionProperties(props, config);
+    assertThat(config.get(propertyKey), is(propertyValue));
+  }
+
+
+  @Test(expected = IllegalArgumentException.class)
+  public void getBrokerOffsetsKafkaNullProperties() throws IOException {
+    KafkaUtils.getBrokerOffsets((Properties) null, kafka.api.OffsetRequest.LatestTime(), topic);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void getBrokerOffsetsKafkaNullTopics() throws IOException {
+    KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(), kafka.api.OffsetRequest.LatestTime(), (String[]) null);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void getBrokerOffsetsKafkaEmptyTopics() throws IOException {
+    KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(), kafka.api.OffsetRequest.LatestTime());
+  }
+
+  @Test(timeout = 10000)
+  public void getLatestBrokerOffsetsKafka() throws IOException, InterruptedException {
+    ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 1, 4);
+    while (true) {
+      Map<TopicPartition, Long> offsets = KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(),
+          kafka.api.OffsetRequest.LatestTime(), topic);
+
+      assertNotNull(offsets);
+      assertThat(offsets.size(), is(4));
+      boolean allMatch = true;
+      for (int i = 0; i < 4; i++) {
+        TopicPartition tp = new TopicPartition(topic, i);
+        assertThat(offsets.keySet(), hasItem(tp));
+        allMatch &= (offsets.get(tp) == 1L);
+      }
+      if (allMatch) {
+        break;
+      }
+      Thread.sleep(100L);
+    }
+  }
+
+  @Test
+  public void getEarliestBrokerOffsetsKafka() throws IOException {
+    ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 1, 1);
+
+    Map<TopicPartition, Long> offsets = KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(),
+        kafka.api.OffsetRequest.EarliestTime(), topic);
+
+    assertNotNull(offsets);
+    //default create 4 topics
+    assertThat(offsets.size(), is(4));
+    for (int i = 0; i < 4; i++) {
+      assertThat(offsets.keySet(), hasItem(new TopicPartition(topic, i)));
+      assertThat(offsets.get(new TopicPartition(topic, i)), is(0L));
+    }
+  }
+
+  @Test
+  public void getBrokerOffsetsKafkaWithTimeBeforeTopicExists() throws IOException {
+    ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 1, 4);
+
+    // A time of 1L (1 ms after epoch) should be before the topic was created
+    Map<TopicPartition, Long> offsets = KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(), 1L, topic);
+
+    assertNotNull(offsets);
+    //default create 4 topics
+    assertThat(offsets.size(), is(4));
+    for (int i = 0; i < 4; i++) {
+      assertThat(offsets.keySet(), hasItem(new TopicPartition(topic, i)));
+      assertThat(offsets.get(new TopicPartition(topic, i)), is(0L));
+    }
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void getBrokerOffsetsNoHostAvailable() throws IOException {
+    Properties testProperties = ClusterTest.getConsumerProperties();
+    testProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyBrokerHost1:0000,dummyBrokerHost2:0000");
+    testProperties.setProperty("metadata.broker.list", "dummyBrokerHost1:0000,dummyBrokerHost2:0000");
+    assertNotNull(KafkaUtils.getBrokerOffsets(testProperties, kafka.api.OffsetRequest.LatestTime(), topic));
+  }
+
+  @Test
+  public void getBrokerOffsetsSomeHostsUnavailable() throws IOException {
+    final Broker bad = new Broker(0, "dummyBrokerHost1", 0, SecurityProtocol.PLAINTEXT);
+    assertNotNull(KafkaUtils.getBrokerOffsets(Arrays.asList(broker, bad), kafka.api.OffsetRequest.LatestTime(), topic));
+    assertNotNull(KafkaUtils.getBrokerOffsets(Arrays.asList(bad, broker), kafka.api.OffsetRequest.LatestTime(), topic));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java
new file mode 100644
index 0000000..d760a02
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.inputformat;
+
+
+import kafka.api.OffsetRequest;
+import org.apache.crunch.Pair;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.kafka.ClusterTest;
+import org.apache.crunch.kafka.KafkaSource;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.crunch.kafka.KafkaUtils.getBrokerOffsets;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.matchers.JUnitMatchers.hasItem;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaInputFormatIT {
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @Mock
+  private TaskAttemptContext taskContext;
+
+  @Mock
+  private FormatBundle bundle;
+  private Properties consumerProps;
+  private Configuration config;
+  private String topic;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterTest.startTest();
+  }
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    ClusterTest.endTest();
+  }
+
+  @Before
+  public void setupTest() {
+    topic = testName.getMethodName();
+    consumerProps = ClusterTest.getConsumerProperties();
+
+    consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName());
+    consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName());
+
+    config = ClusterTest.getConsumerConfig();
+
+    config.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName());
+    config.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName());
+  }
+
+  @Test
+  public void getSplitsFromFormat() throws IOException, InterruptedException {
+    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
+    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
+    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      Long endingOffset = endOffsets.get(entry.getKey());
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
+    }
+
+    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
+
+    KafkaInputFormat inputFormat = new KafkaInputFormat();
+    inputFormat.setConf(config);
+    List<InputSplit> splits = inputFormat.getSplits(null);
+
+    assertThat(splits.size(), is(offsets.size()));
+
+    for (InputSplit split : splits) {
+      KafkaInputSplit inputSplit = (KafkaInputSplit) split;
+      Pair<Long, Long> startEnd = offsets.get(inputSplit.getTopicPartition());
+      assertThat(inputSplit.getStartingOffset(), is(startEnd.first()));
+      assertThat(inputSplit.getEndingOffset(), is(startEnd.second()));
+    }
+  }
+
+  @Test
+  public void getSplitsSameStartEnd() throws IOException, InterruptedException {
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for(int i = 0; i < 10; i++) {
+      offsets.put(new TopicPartition(topic, i), Pair.of((long)i, (long)i));
+    }
+
+    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
+
+    KafkaInputFormat inputFormat = new KafkaInputFormat();
+    inputFormat.setConf(config);
+    List<InputSplit> splits = inputFormat.getSplits(null);
+
+    assertThat(splits.size(), is(0));
+  }
+
+  @Test
+  public void getSplitsCreateReaders() throws IOException, InterruptedException {
+    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
+    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
+    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      Long endingOffset = endOffsets.get(entry.getKey());
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
+    }
+
+    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
+
+    KafkaInputFormat inputFormat = new KafkaInputFormat();
+    inputFormat.setConf(config);
+    List<InputSplit> splits = inputFormat.getSplits(null);
+
+    assertThat(splits.size(), is(offsets.size()));
+
+    for (InputSplit split : splits) {
+      KafkaInputSplit inputSplit = (KafkaInputSplit) split;
+      Pair<Long, Long> startEnd = offsets.get(inputSplit.getTopicPartition());
+      assertThat(inputSplit.getStartingOffset(), is(startEnd.first()));
+      assertThat(inputSplit.getEndingOffset(), is(startEnd.second()));
+    }
+
+    //create readers and consume the data
+    when(taskContext.getConfiguration()).thenReturn(config);
+    Set<String> keysRead = new HashSet<>();
+    //read all data from all splits
+    for (InputSplit split : splits) {
+      KafkaInputSplit inputSplit = (KafkaInputSplit) split;
+      long start = inputSplit.getStartingOffset();
+      long end = inputSplit.getEndingOffset();
+
+      RecordReader<BytesWritable, BytesWritable> recordReader = inputFormat.createRecordReader(split, taskContext);
+      recordReader.initialize(split, taskContext);
+
+      int numRecordsFound = 0;
+      while (recordReader.nextKeyValue()) {
+        keysRead.add(new String(recordReader.getCurrentKey().getBytes()));
+        assertThat(keys, hasItem(new String(recordReader.getCurrentKey().getBytes())));
+        assertThat(recordReader.getCurrentValue(), is(notNullValue()));
+        numRecordsFound++;
+      }
+      recordReader.close();
+
+      //assert that it encountered a partitions worth of data
+      assertThat(((long) numRecordsFound), is(end - start));
+    }
+
+    //validate the same number of unique keys was read as were written.
+    assertThat(keysRead.size(), is(keys.size()));
+  }
+
+  @Test
+  public void writeOffsetsToFormatBundle() {
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    String topic = testName.getMethodName();
+    int numPartitions = 10;
+    for (int i = 0; i < numPartitions; i++) {
+      TopicPartition tAndP = new TopicPartition(topic, i);
+      offsets.put(tAndP, Pair.of((long) i, i * 10L));
+    }
+
+    KafkaInputFormat.writeOffsetsToBundle(offsets, bundle);
+
+    ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<String> valueCaptor = ArgumentCaptor.forClass(String.class);
+
+    //number of Partitions * 2 for start and end + 1 for the topic
+    verify(bundle, times((numPartitions * 2) + 1)).set(keyCaptor.capture(), valueCaptor.capture());
+
+    List<String> keyValues = keyCaptor.getAllValues();
+    List<String> valueValues = valueCaptor.getAllValues();
+
+    String partitionKey = KafkaInputFormat.generateTopicPartitionsKey(topic);
+    assertThat(keyValues, hasItem(partitionKey));
+
+    String partitions = valueValues.get(keyValues.indexOf(partitionKey));
+    List<String> parts = Arrays.asList(partitions.split(","));
+
+    for (int i = 0; i < numPartitions; i++) {
+      assertThat(keyValues, hasItem(KafkaInputFormat.generateTopicPartitionsKey(topic)));
+      String startKey = KafkaInputFormat.generatePartitionStartKey(topic, i);
+      String endKey = KafkaInputFormat.generatePartitionEndKey(topic, i);
+      assertThat(keyValues, hasItem(startKey));
+      assertThat(keyValues, hasItem(endKey));
+      assertThat(valueValues.get(keyValues.indexOf(startKey)), is(Long.toString(i)));
+      assertThat(valueValues.get(keyValues.indexOf(endKey)), is(Long.toString(i * 10L)));
+      assertThat(parts, hasItem(Long.toString(i)));
+    }
+  }
+
+  @Test
+  public void writeOffsetsToFormatBundleSpecialCharacters() {
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    String topic = "partitions." + testName.getMethodName();
+    int numPartitions = 10;
+    for (int i = 0; i < numPartitions; i++) {
+      TopicPartition tAndP = new TopicPartition(topic, i);
+      offsets.put(tAndP, Pair.of((long) i, i * 10L));
+    }
+
+    KafkaInputFormat.writeOffsetsToBundle(offsets, bundle);
+
+    ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<String> valueCaptor = ArgumentCaptor.forClass(String.class);
+
+    //number of Partitions * 2 for start and end + 1 for the topic
+    verify(bundle, times((numPartitions * 2) + 1)).set(keyCaptor.capture(), valueCaptor.capture());
+
+    List<String> keyValues = keyCaptor.getAllValues();
+    List<String> valueValues = valueCaptor.getAllValues();
+
+    String partitionKey = KafkaInputFormat.generateTopicPartitionsKey(topic);
+    assertThat(keyValues, hasItem(partitionKey));
+
+    String partitions = valueValues.get(keyValues.indexOf(partitionKey));
+    List<String> parts = Arrays.asList(partitions.split(","));
+
+    for (int i = 0; i < numPartitions; i++) {
+      assertThat(keyValues, hasItem(KafkaInputFormat.generateTopicPartitionsKey(topic)));
+      String startKey = KafkaInputFormat.generatePartitionStartKey(topic, i);
+      String endKey = KafkaInputFormat.generatePartitionEndKey(topic, i);
+      assertThat(keyValues, hasItem(startKey));
+      assertThat(keyValues, hasItem(endKey));
+      assertThat(valueValues.get(keyValues.indexOf(startKey)), is(Long.toString(i)));
+      assertThat(valueValues.get(keyValues.indexOf(endKey)), is(Long.toString(i * 10L)));
+      assertThat(parts, hasItem(Long.toString(i)));
+    }
+  }
+
+  @Test
+  public void writeOffsetsToFormatBundleMultipleTopics() {
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    Set<String> topics = new HashSet<>();
+
+    int numPartitions = 10;
+    int numTopics = 10;
+    for (int j = 0; j < numTopics; j++) {
+      String topic = testName.getMethodName() + j;
+      topics.add(topic);
+      for (int i = 0; i < numPartitions; i++) {
+        TopicPartition tAndP = new TopicPartition(topic, i);
+        offsets.put(tAndP, Pair.of((long) i, i * 10L));
+      }
+    }
+
+    KafkaInputFormat.writeOffsetsToBundle(offsets, bundle);
+
+    ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<String> valueCaptor = ArgumentCaptor.forClass(String.class);
+
+    //number of Partitions * 2 for start and end + num of topics
+    verify(bundle, times((numTopics * numPartitions * 2) + numTopics)).set(keyCaptor.capture(), valueCaptor.capture());
+
+    List<String> keyValues = keyCaptor.getAllValues();
+    List<String> valueValues = valueCaptor.getAllValues();
+
+    for (String topic : topics) {
+
+      String partitionKey = KafkaInputFormat.generateTopicPartitionsKey(topic);
+      assertThat(keyValues, hasItem(partitionKey));
+
+      String partitions = valueValues.get(keyValues.indexOf(partitionKey));
+      List<String> parts = Arrays.asList(partitions.split(","));
+
+      for (int i = 0; i < numPartitions; i++) {
+        assertThat(keyValues, hasItem(KafkaInputFormat.generateTopicPartitionsKey(topic)));
+        String startKey = KafkaInputFormat.generatePartitionStartKey(topic, i);
+        String endKey = KafkaInputFormat.generatePartitionEndKey(topic, i);
+        assertThat(keyValues, hasItem(startKey));
+        assertThat(keyValues, hasItem(endKey));
+        assertThat(valueValues.get(keyValues.indexOf(startKey)), is(Long.toString(i)));
+        assertThat(valueValues.get(keyValues.indexOf(endKey)), is(Long.toString(i * 10L)));
+        assertThat(parts, hasItem(Long.toString(i)));
+      }
+    }
+  }
+
+  @Test
+  public void getOffsetsFromConfig() {
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    Set<String> topics = new HashSet<>();
+
+    int numPartitions = 10;
+    int numTopics = 10;
+    for (int j = 0; j < numTopics; j++) {
+      String topic = testName.getMethodName() + ".partitions" + j;
+      topics.add(topic);
+      for (int i = 0; i < numPartitions; i++) {
+        TopicPartition tAndP = new TopicPartition(topic, i);
+        offsets.put(tAndP, Pair.of((long) i, i * 10L));
+      }
+    }
+
+    Configuration config = new Configuration(false);
+
+    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
+
+    Map<TopicPartition, Pair<Long, Long>> returnedOffsets = KafkaInputFormat.getOffsets(config);
+
+    assertThat(returnedOffsets.size(), is(returnedOffsets.size()));
+    for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) {
+      Pair<Long, Long> valuePair = returnedOffsets.get(entry.getKey());
+      assertThat(valuePair, is(entry.getValue()));
+    }
+  }
+
+  @Test(expected=IllegalStateException.class)
+  public void getOffsetsFromConfigMissingStart() {
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    Set<String> topics = new HashSet<>();
+
+    int numPartitions = 10;
+    int numTopics = 10;
+    for (int j = 0; j < numTopics; j++) {
+      String topic = testName.getMethodName() + ".partitions" + j;
+      topics.add(topic);
+      for (int i = 0; i < numPartitions; i++) {
+        TopicPartition tAndP = new TopicPartition(topic, i);
+        offsets.put(tAndP, Pair.of((long) i, i * 10L));
+      }
+    }
+
+    Configuration config = new Configuration(false);
+
+    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
+
+    config.unset("org.apache.crunch.kafka.offsets.topic."+topics.iterator().next()+".partitions.0.start");
+
+    Map<TopicPartition, Pair<Long, Long>> returnedOffsets = KafkaInputFormat.getOffsets(config);
+  }
+
+  @Test(expected=IllegalStateException.class)
+  public void getOffsetsFromConfigMissingEnd() {
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    Set<String> topics = new HashSet<>();
+
+    int numPartitions = 10;
+    int numTopics = 10;
+    for (int j = 0; j < numTopics; j++) {
+      String topic = testName.getMethodName() + ".partitions" + j;
+      topics.add(topic);
+      for (int i = 0; i < numPartitions; i++) {
+        TopicPartition tAndP = new TopicPartition(topic, i);
+        offsets.put(tAndP, Pair.of((long) i, i * 10L));
+      }
+    }
+
+    Configuration config = new Configuration(false);
+
+    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
+
+    config.unset("org.apache.crunch.kafka.offsets.topic."+topics.iterator().next()+".partitions.0.end");
+
+    Map<TopicPartition, Pair<Long, Long>> returnedOffsets = KafkaInputFormat.getOffsets(config);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputSplitTest.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputSplitTest.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputSplitTest.java
new file mode 100644
index 0000000..3833e9d
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputSplitTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.inputformat;
+
+import kafka.api.OffsetRequest;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class KafkaInputSplitTest {
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @Test
+  public void createSplit() throws IOException, InterruptedException {
+    String topic = testName.getMethodName();
+    int partition = 18;
+    long startingOffet = 10;
+    long endingOffset = 23;
+
+
+    KafkaInputSplit split = new KafkaInputSplit(topic, partition, startingOffet, endingOffset);
+    assertThat(split.getStartingOffset(), is(startingOffet));
+    assertThat(split.getEndingOffset(), is(endingOffset));
+    assertThat(split.getTopicPartition(), is(new TopicPartition(topic, partition)));
+    assertThat(split.getLength(), is(endingOffset - startingOffet));
+    assertThat(split.getLocations(), is(new String[0]));
+  }
+
+  @Test
+  public void createSplitEarliestOffset() throws IOException, InterruptedException {
+    String topic = testName.getMethodName();
+    int partition = 18;
+    long endingOffset = 23;
+
+    KafkaInputSplit split = new KafkaInputSplit(topic, partition, -1L, endingOffset);
+    assertThat(split.getStartingOffset(), is(-1L));
+    assertThat(split.getEndingOffset(), is(endingOffset));
+    assertThat(split.getTopicPartition(), is(new TopicPartition(topic, partition)));
+    assertThat(split.getLength(), is(endingOffset));
+    assertThat(split.getLocations(), is(new String[0]));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java
new file mode 100644
index 0000000..ba5b65b
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.inputformat;
+
+import kafka.api.OffsetRequest;
+import org.apache.crunch.Pair;
+import org.apache.crunch.kafka.ClusterTest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.crunch.kafka.KafkaUtils.getBrokerOffsets;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.matchers.JUnitMatchers.hasItem;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaRecordReaderIT {
+
+  @Mock
+  private TaskAttemptContext context;
+
+  @Rule
+  public TestName testName = new TestName();
+  private Properties consumerProps;
+  private Configuration config;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterTest.startTest();
+  }
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    ClusterTest.endTest();
+  }
+
+  private String topic;
+
+  @Before
+  public void setupTest() {
+    topic = testName.getMethodName();
+    consumerProps = ClusterTest.getConsumerProperties();
+    config = ClusterTest.getConsumerConfig();
+    when(context.getConfiguration()).thenReturn(config);
+  }
+
+  @Test
+  public void readData() throws IOException, InterruptedException {
+    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
+
+    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
+    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      Long endingOffset = endOffsets.get(entry.getKey());
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
+    }
+
+    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
+
+    Set<String> keysRead = new HashSet<>();
+    //read all data from all splits
+    for (Map.Entry<TopicPartition, Pair<Long, Long>> partitionInfo : offsets.entrySet()) {
+      KafkaInputSplit split = new KafkaInputSplit(partitionInfo.getKey().topic(), partitionInfo.getKey().partition(),
+          partitionInfo.getValue().first(), partitionInfo.getValue().second());
+
+      KafkaRecordReader<String, String> recordReader = new KafkaRecordReader<>();
+      recordReader.initialize(split, context);
+
+      int numRecordsFound = 0;
+      while (recordReader.nextKeyValue()) {
+        keysRead.add(recordReader.getCurrentKey());
+        assertThat(keys, hasItem(recordReader.getCurrentKey()));
+        assertThat(recordReader.getCurrentValue(), is(notNullValue()));
+        numRecordsFound++;
+      }
+      recordReader.close();
+
+      //assert that it encountered a partitions worth of data
+      assertThat(((long) numRecordsFound), is(partitionInfo.getValue().second() - partitionInfo.getValue().first()));
+    }
+
+    //validate the same number of unique keys was read as were written.
+    assertThat(keysRead.size(), is(keys.size()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/EmbeddedZookeeper.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/EmbeddedZookeeper.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/EmbeddedZookeeper.java
new file mode 100644
index 0000000..ede3cf0
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/EmbeddedZookeeper.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * <p>
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.utils;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * Embedded Zookeeper instance for testing purposes.
+ * <p>
+ * Adapted from the {@code kafka.zk.EmbeddedZookeeper} class.
+ * </p>
+ */
+class EmbeddedZookeeper {
+
+  private final File snapshotDir;
+  private final File logDir;
+  private final NIOServerCnxnFactory factory;
+
+  /**
+   * Constructs an embedded Zookeeper instance.
+   *
+   * @param connectString Zookeeper connection string.
+   *
+   * @throws IOException if an error occurs during Zookeeper initialization.
+   */
+  public EmbeddedZookeeper(String connectString) throws IOException {
+    this.snapshotDir = KafkaTestUtils.getTempDir();
+    this.logDir = KafkaTestUtils.getTempDir();
+    this.factory = new NIOServerCnxnFactory();
+    String hostname = connectString.split(":")[0];
+    int port = Integer.valueOf(connectString.split(":")[1]);
+    int maxClientConnections = 1024;
+    factory.configure(new InetSocketAddress(hostname, port), maxClientConnections);
+    try {
+      int tickTime = 500;
+      factory.startup(new ZooKeeperServer(snapshotDir, logDir, tickTime));
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Shuts down the embedded Zookeeper instance.
+   */
+  public void shutdown() throws IOException {
+    factory.shutdown();
+    FileUtils.deleteDirectory(snapshotDir);
+    FileUtils.deleteDirectory(logDir);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java
new file mode 100644
index 0000000..f47f168
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crunch.kafka.utils;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.Time;
+import org.apache.commons.io.FileUtils;
+import scala.Option;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static scala.collection.JavaConversions.asJavaIterable;
+
+/**
+ * A test harness that brings up some number of Kafka broker nodes.
+ * <p>
+ * Adapted from the {@code kafka.integration.KafkaServerTestHarness} class.
+ * </p>
+ */
+public class KafkaBrokerTestHarness extends ZookeeperTestHarness {
+
+  /**
+   * Producer send acknowledgment timeout in milliseconds.
+   */
+  public static final String KAFKA_PRODUCER_ACK_TIMEOUT_MILLIS = "request.timeout.ms";
+
+  /**
+   * Producer send retry maximum count.
+   */
+  public static final String KAFKA_PRODUCER_RETRY_MAX = "message.send.max.retries";
+
+  /**
+   * Producer send retry backoff interval in milliseconds.
+   */
+  public static final String KAFKA_PRODUCER_RETRY_INTERVAL_MILLIS = "retry.backoff.ms";
+
+  /**
+   * Comma-delimited Kafka Zookeeper quorum list.
+   */
+  public static final String KAFKA_ZOOKEEPERS = "zookeeper.connect";
+
+  /**
+   * Comma-delimited list of Kafka brokers, for producer bootstrapping purposes.
+   */
+  public static final String KAFKA_BROKERS = "metadata.broker.list";
+
+  /**
+   * Default number of brokers in the Kafka cluster.
+   */
+  public static final int DEFAULT_BROKERS = 1;
+
+  /**
+   * Default number of partitions per Kafka topic.
+   */
+  public static final int PARTITIONS_PER_TOPIC = 4;
+
+  private List<KafkaConfig> brokerConfigs;
+  private List<KafkaServer> brokers;
+  private File clientConfig;
+  private boolean setUp;
+  private boolean tornDown;
+
+  /**
+   * Creates a new Kafka broker test harness using the {@link #DEFAULT_BROKERS default} number of brokers.
+   */
+  public KafkaBrokerTestHarness() {
+    this(DEFAULT_BROKERS, KafkaTestUtils.getPorts(1)[0]);
+  }
+
+  /**
+   * Creates a new Kafka broker test harness using the {@link #DEFAULT_BROKERS default} number of brokers and the supplied
+   * {@link Properties} which will be applied to the brokers.
+   *
+   * @param properties
+   *            the additional {@link Properties} supplied to the brokers
+   * @throws IllegalArgumentException
+   *             if {@code properties} is {@code null}
+   */
+  public KafkaBrokerTestHarness(Properties properties) {
+    this(DEFAULT_BROKERS, KafkaTestUtils.getPorts(1)[0], properties);
+  }
+
+  /**
+   * Creates a new Kafka broker test harness using the given number of brokers and Zookeeper port.
+   *
+   * @param brokers Number of Kafka brokers to start up.
+   * @param zookeeperPort The port number to use for Zookeeper client connections.
+   *
+   * @throws IllegalArgumentException if {@code brokers} is less than 1.
+   */
+  public KafkaBrokerTestHarness(int brokers, int zookeeperPort) {
+    this(getBrokerConfig(brokers, zookeeperPort), zookeeperPort);
+  }
+
+  /**
+   * Creates a new Kafka broker test harness using the given number of brokers and Zookeeper port.
+   *
+   * @param brokers
+   *            Number of Kafka brokers to start up.
+   * @param zookeeperPort
+   *            The port number to use for Zookeeper client connections.
+   * @param properties
+   *            the additional {@link Properties} supplied to the brokers
+   *
+   * @throws IllegalArgumentException
+   *             if {@code brokers} is less than 1 or if {@code baseProperties} is {@code null}
+   */
+  public KafkaBrokerTestHarness(int brokers, int zookeeperPort, Properties properties) {
+    this(getBrokerConfig(brokers, zookeeperPort, properties), zookeeperPort);
+  }
+
+  /**
+   * Creates a new Kafka broker test harness using the given broker configuration properties and Zookeeper port.
+   *
+   * @param brokerConfigs List of Kafka broker configurations.
+   * @param zookeeperPort The port number to use for Zookeeper client connections.
+   *
+   * @throws IllegalArgumentException if {@code brokerConfigs} is {@code null} or empty.
+   */
+  public KafkaBrokerTestHarness(List<KafkaConfig> brokerConfigs, int zookeeperPort) {
+    super(zookeeperPort);
+    if (brokerConfigs == null || brokerConfigs.isEmpty()) {
+      throw new IllegalArgumentException("Must supply at least one broker configuration.");
+    }
+    this.brokerConfigs = brokerConfigs;
+    this.brokers = null;
+    this.setUp = false;
+    this.tornDown = false;
+  }
+
+  /**
+   * Start up the Kafka broker cluster.
+   *
+   * @throws IOException if an error occurs during Kafka broker startup.
+   * @throws IllegalStateException if the Kafka broker cluster has already been {@link #setUp() setup}.
+   */
+  @Override
+  public void setUp() throws IOException {
+    if (setUp) {
+      throw new IllegalStateException("Already setup, cannot setup again");
+    }
+    setUp = true;
+
+    // Start up zookeeper.
+    super.setUp();
+
+    brokers = new ArrayList<KafkaServer>(brokerConfigs.size());
+    for (KafkaConfig config : brokerConfigs) {
+      brokers.add(startBroker(config));
+    }
+
+    // Write out Kafka client config to a temp file.
+    clientConfig = new File(KafkaTestUtils.getTempDir(), "kafka-config.xml");
+    FileWriter writer = new FileWriter(clientConfig);
+    writer.append("<configuration>");
+    for (String prop : Arrays.asList(KAFKA_BROKERS, KAFKA_ZOOKEEPERS)) {
+      writer.append("<property>");
+      writer.append("<name>").append(prop).append("</name>");
+      writer.append("<value>").append(getProps().getProperty(prop)).append("</value>");
+      writer.append("</property>");
+    }
+    writer.append("</configuration>");
+    writer.close();
+  }
+
+  /**
+   * Shutdown the Kafka broker cluster. Attempting to {@link #setUp()} a cluster again after calling this method is not allowed;
+   * a new {@code KafkaBrokerTestHarness} must be created instead.
+   *
+   * @throws IllegalStateException if the Kafka broker cluster has already been {@link #tearDown() torn down} or has not been
+   *      {@link #setUp()}.
+   */
+  @Override
+  public void tearDown() throws IOException {
+    if (!setUp) {
+      throw new IllegalStateException("Not set up, cannot tear down");
+    }
+    if (tornDown) {
+      throw new IllegalStateException("Already torn down, cannot tear down again");
+    }
+    tornDown = true;
+
+    for (KafkaServer broker : brokers) {
+      broker.shutdown();
+    }
+
+    for (KafkaServer broker : brokers) {
+      for (String logDir : asJavaIterable(broker.config().logDirs())) {
+        FileUtils.deleteDirectory(new File(logDir));
+      }
+    }
+
+    // Shutdown zookeeper
+    super.tearDown();
+  }
+
+  /**
+   * Returns properties for a Kafka producer.
+   *
+   * @return Producer properties.
+   */
+  public Properties getProducerProps() {
+    StringBuilder brokers = new StringBuilder();
+    for (int i = 0; i < brokerConfigs.size(); ++i) {
+      KafkaConfig config = brokerConfigs.get(i);
+      brokers.append((i > 0) ? "," : "").append(config.hostName()).append(":").append(config.port());
+    }
+
+    Properties props = new Properties();
+    props.setProperty(KAFKA_BROKERS, brokers.toString());
+    props.setProperty(KAFKA_PRODUCER_ACK_TIMEOUT_MILLIS, "10000");
+
+    // These two properties below are increased from their defaults to help with the case that auto.create.topics.enable is
+    // disabled and a test tries to create a topic and immediately write to it
+    props.setProperty(KAFKA_PRODUCER_RETRY_INTERVAL_MILLIS, Integer.toString(500));
+    props.setProperty(KAFKA_PRODUCER_RETRY_MAX, Integer.toString(10));
+
+    return props;
+  }
+
+  /**
+   * Returns properties for a Kafka consumer.
+   *
+   * @return Consumer properties.
+   */
+  public Properties getConsumerProps() {
+    Properties props = new Properties();
+    props.setProperty(KAFKA_ZOOKEEPERS, zookeeperConnect);
+    return props;
+  }
+
+  /**
+   * Returns properties for either a Kafka producer or consumer.
+   *
+   * @return Combined producer and consumer properties.
+   */
+  public Properties getProps() {
+    // Combine producer and consumer properties.
+    Properties props = getProducerProps();
+    props.putAll(getConsumerProps());
+    return props;
+  }
+
+  /**
+   * Returns configuration properties for each Kafka broker in the cluster.
+   *
+   * @return Broker properties.
+   */
+  public List<Properties> getBrokerProps() {
+    List<Properties> props = new ArrayList<Properties>(brokers.size());
+    for (KafkaServer broker : brokers) {
+      Properties prop = new Properties();
+      prop.putAll(broker.config().props());
+      props.add(prop);
+    }
+    return props;
+  }
+
+  /**
+   * Creates a collection of Kafka Broker configurations based on the number of brokers and zookeeper.
+   * @param brokers the number of brokers to create configuration for.
+   * @param zookeeperPort the zookeeper port for the brokers to connect to.
+   * @return configuration for a collection of brokers.
+   * @throws IllegalArgumentException if {@code brokers} is less than 1
+   */
+  public static List<KafkaConfig> getBrokerConfig(int brokers, int zookeeperPort) {
+    return getBrokerConfig(brokers, zookeeperPort, new Properties());
+  }
+
+  /**
+   * Creates a collection of Kafka Broker configurations based on the number of brokers and zookeeper.
+   * @param brokers the number of brokers to create configuration for.
+   * @param zookeeperPort the zookeeper port for the brokers to connect to.
+   * @param baseProperties basic properties that should be applied for each broker config.  These properties will be
+   *                       honored in favor of any default properties.
+   * @return configuration for a collection of brokers.
+   * @throws IllegalArgumentException if {@code brokers} is less than 1 or {@code baseProperties} is {@code null}.
+   */
+  public static List<KafkaConfig> getBrokerConfig(int brokers, int zookeeperPort, Properties baseProperties) {
+    if (brokers < 1) {
+      throw new IllegalArgumentException("Invalid broker count: " + brokers);
+    }
+    if (baseProperties == null) {
+      throw new IllegalArgumentException("The 'baseProperties' cannot be 'null'.");
+    }
+
+    int ports[] = KafkaTestUtils.getPorts(brokers);
+
+    List<KafkaConfig> configs = new ArrayList<KafkaConfig>(brokers);
+    for (int i = 0; i < brokers; ++i) {
+      Properties props = new Properties();
+      props.setProperty(KAFKA_ZOOKEEPERS, "localhost:" + zookeeperPort);
+      props.setProperty("broker.id", String.valueOf(i + 1));
+      props.setProperty("host.name", "localhost");
+      props.setProperty("port", String.valueOf(ports[i]));
+      props.setProperty("log.dir", KafkaTestUtils.getTempDir().getAbsolutePath());
+      props.setProperty("log.flush.interval.messages", String.valueOf(1));
+      props.setProperty("num.partitions", String.valueOf(PARTITIONS_PER_TOPIC));
+      props.setProperty("default.replication.factor", String.valueOf(brokers));
+      props.setProperty("auto.create.topics.enable", Boolean.FALSE.toString());
+
+      props.putAll(baseProperties);
+
+      configs.add(new KafkaConfig(props));
+    }
+    return configs;
+  }
+
+  /**
+   * Returns location of Kafka client configuration file containing broker and zookeeper connection properties.
+   * <p>
+   * This file can be loaded using the {@code -conf} command option to easily achieve Kafka connectivity.
+   * </p>
+   *
+   * @return Kafka client configuration file path
+   */
+  public String getClientConfigPath() {
+    return clientConfig.getAbsolutePath();
+  }
+
+  private static KafkaServer startBroker(KafkaConfig config) {
+    KafkaServer server = new KafkaServer(config, new SystemTime(), Option.<String>empty());
+    server.startup();
+    return server;
+  }
+
+  private static class SystemTime implements Time {
+    @Override
+    public long milliseconds() {
+      return System.currentTimeMillis();
+    }
+
+    @Override
+    public long nanoseconds() {
+      return System.nanoTime();
+    }
+
+    @Override
+    public void sleep(long ms) {
+      try {
+        Thread.sleep(ms);
+      } catch (InterruptedException e) {
+        // Ignore
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaTestUtils.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaTestUtils.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaTestUtils.java
new file mode 100644
index 0000000..f8eb2ff
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaTestUtils.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * Assorted Kafka testing utility methods.
+ */
+public class KafkaTestUtils {
+
+  private static final Random RANDOM = new Random();
+  private static final String TEMP_DIR_PREFIX = "kafka-";
+
+  private static final Set<Integer> USED_PORTS = new HashSet<Integer>();
+
+  /**
+   * Creates and returns a new randomly named temporary directory. It will be deleted upon JVM exit.
+   *
+   * @return a new temporary directory.
+   *
+   * @throws RuntimeException if a new temporary directory could not be created.
+   */
+  public static File getTempDir() {
+    File file = new File(System.getProperty("java.io.tmpdir"), TEMP_DIR_PREFIX + RANDOM.nextInt(10000000));
+    if (!file.mkdirs()) {
+      throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath());
+    }
+    file.deleteOnExit();
+    return file;
+  }
+
+  /**
+   * Returns an array containing the specified number of available local ports.
+   *
+   * @param count Number of local ports to identify and return.
+   *
+   * @return an array of available local port numbers.
+   *
+   * @throws RuntimeException if an I/O error occurs opening or closing a socket.
+   */
+  public static int[] getPorts(int count) {
+    int[] ports = new int[count];
+    Set<ServerSocket> openSockets = new HashSet<ServerSocket>(count + USED_PORTS.size());
+
+    for (int i = 0; i < count; ) {
+      try {
+        ServerSocket socket = new ServerSocket(0);
+        int port = socket.getLocalPort();
+        openSockets.add(socket);
+
+        // Disallow port reuse.
+        if (!USED_PORTS.contains(port)) {
+          ports[i++] = port;
+          USED_PORTS.add(port);
+        }
+      } catch (IOException e) {
+        throw new RuntimeException("could not open socket", e);
+      }
+    }
+
+    // Close the sockets so that their port numbers can be used by the caller.
+    for (ServerSocket socket : openSockets) {
+      try {
+        socket.close();
+      } catch (IOException e) {
+        throw new RuntimeException("could not close socket", e);
+      }
+    }
+
+    return ports;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZkStringSerializer.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZkStringSerializer.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZkStringSerializer.java
new file mode 100644
index 0000000..6ee102e
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZkStringSerializer.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.utils;
+
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+import java.nio.charset.Charset;
+
+/**
+ * A {@link ZkSerializer Zookeeper serializer} for {@link String} objects.
+ * <p>
+ * Ported from the {@code kafka.utils.ZKStringSerializer} scala object.
+ * </p>
+ */
+public class ZkStringSerializer implements ZkSerializer {
+
+  private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+  @Override
+  public byte[] serialize(Object data) throws ZkMarshallingError {
+    return ((String) data).getBytes(UTF_8);
+  }
+
+  @Override
+  public Object deserialize(byte[] bytes) throws ZkMarshallingError {
+    return bytes != null ? new String(bytes, UTF_8) : null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZookeeperTestHarness.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZookeeperTestHarness.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZookeeperTestHarness.java
new file mode 100644
index 0000000..c4a7e15
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZookeeperTestHarness.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crunch.kafka.utils;
+
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+
+import java.io.IOException;
+
+/**
+ * A test harness that brings up an embedded Zookeeper instance.
+ * <p>
+ * Adapted from the {@code kafka.zk.ZooKeeperTestHarness} class.
+ * </p>
+ */
+public class ZookeeperTestHarness {
+
+  /**
+   * Zookeeper connection info.
+   */
+  protected final String zookeeperConnect;
+
+  private EmbeddedZookeeper zookeeper;
+  private final int zkConnectionTimeout;
+  private final int zkSessionTimeout;
+
+  /**
+   * Zookeeper client connection.
+   */
+  protected ZkUtils zkUtils;
+
+  /**
+   * Creates a new Zookeeper broker test harness.
+   */
+  public ZookeeperTestHarness() {
+    this(KafkaTestUtils.getPorts(1)[0]);
+  }
+
+  /**
+   * Creates a new Zookeeper service test harness using the given port.
+   *
+   * @param zookeeperPort The port number to use for Zookeeper client connections.
+   */
+  public ZookeeperTestHarness(int zookeeperPort) {
+    this.zookeeper = null;
+    this.zkUtils = null;
+    this.zkConnectionTimeout = 6000;
+    this.zkSessionTimeout = 6000;
+    this.zookeeperConnect = "localhost:" + zookeeperPort;
+  }
+
+  /**
+   * Returns a client for communicating with the Zookeeper service.
+   *
+   * @return A Zookeeper client.
+   *
+   * @throws IllegalStateException
+   *             if Zookeeper has not yet been {@link #setUp()}, or has already been {@link #tearDown() torn down}.
+   */
+  public ZkClient getZkClient() {
+    if (zkUtils == null) {
+      throw new IllegalStateException("Zookeeper service is not active");
+    }
+    return zkUtils.zkClient();
+  }
+
+  public ZkUtils getZkUtils() {
+    return zkUtils;
+  }
+
+  /**
+   * Startup Zookeeper.
+   *
+   * @throws IOException if an error occurs during Zookeeper initialization.
+   */
+  public void setUp() throws IOException {
+    zookeeper = new EmbeddedZookeeper(zookeeperConnect);
+    ZkClient zkClient = new ZkClient(zookeeperConnect, zkSessionTimeout, zkConnectionTimeout, new ZkStringSerializer());
+    ZkConnection connection = new ZkConnection(zookeeperConnect, zkSessionTimeout);
+    zkUtils = new ZkUtils(zkClient, connection, false);
+  }
+
+  /**
+   * Shutdown Zookeeper.
+   */
+  public void tearDown() throws IOException {
+    if (zkUtils != null) {
+      zkUtils.close();
+      zkUtils = null;
+    }
+    if (zookeeper != null) {
+      zookeeper.shutdown();
+      zookeeper = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/resources/log4j.properties b/crunch-kafka/src/test/resources/log4j.properties
new file mode 100644
index 0000000..0b3eeee
--- /dev/null
+++ b/crunch-kafka/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, A1
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+
+# Print the date in ISO 8601 format
+log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
+
+# Limit Apache logging to keep us from being overwhelmed as our tests stop and restart servers.
+log4j.logger.org.apache.hadoop=WARN
+log4j.logger.org.apache.zookeeper=WARN
+log4j.logger.org.mortbay=WARN
+log4j.logger.org.apache.zookeeper.client.ZooKeeperSaslClient=ERROR
+log4j.logger.org.apache.hadoop.conf.Configuration=ERROR
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5b390ee..78ea085 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,7 @@ under the License.
     <module>crunch-spark</module>
     <module>crunch-hive</module>
     <module>crunch-dist</module>
+    <module>crunch-kafka</module>
   </modules>
   <profiles>
     <profile>
@@ -103,6 +104,7 @@ under the License.
     <hbase.version>1.0.0</hbase.version>
     <avro.classifier>hadoop2</avro.classifier>
 
+    <kafka.version>0.9.0.1</kafka.version>
     <scala.base.version>2.10</scala.base.version>
     <scala.version>2.10.4</scala.version>
     <scalatest.version>2.2.4</scalatest.version>
@@ -455,6 +457,17 @@ under the License.
       </dependency>
 
       <dependency>
+        <groupId>org.apache.kafka</groupId>
+        <artifactId>kafka-clients</artifactId>
+        <version>${kafka.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.kafka</groupId>
+        <artifactId>kafka_${scala.base.version}</artifactId>
+        <version>${kafka.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>com.google.code.findbugs</groupId>
         <artifactId>jsr305</artifactId>
         <version>${jsr305.version}</version>


[2/3] crunch git commit: CRUNCH-606: Kafka Source for Crunch which supports reading data as BytesWritable

Posted by mk...@apache.org.
CRUNCH-606: Kafka Source for Crunch which supports reading data as BytesWritable

* Some of the code contributed by Bryan Baugher and Andrew Olson


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/321cfef6
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/321cfef6
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/321cfef6

Branch: refs/heads/master
Commit: 321cfef6e85325ab7a4d9548686a96972f6f31fd
Parents: c09c4ee
Author: Micah Whitacre <mk...@gmail.com>
Authored: Mon Apr 11 09:47:33 2016 -0500
Committer: Micah Whitacre <mk...@gmail.com>
Committed: Mon May 23 15:13:36 2016 -0500

----------------------------------------------------------------------
 crunch-kafka/pom.xml                            |  82 ++++
 .../java/org/apache/crunch/kafka/KafkaData.java |  63 +++
 .../crunch/kafka/KafkaRecordsIterable.java      | 294 +++++++++++++
 .../org/apache/crunch/kafka/KafkaSource.java    | 225 ++++++++++
 .../org/apache/crunch/kafka/KafkaUtils.java     | 301 ++++++++++++++
 .../kafka/inputformat/KafkaInputFormat.java     | 235 +++++++++++
 .../kafka/inputformat/KafkaInputSplit.java      | 117 ++++++
 .../kafka/inputformat/KafkaRecordReader.java    | 152 +++++++
 .../org/apache/crunch/kafka/ClusterTest.java    | 217 ++++++++++
 .../org/apache/crunch/kafka/KafkaDataIT.java    | 118 ++++++
 .../crunch/kafka/KafkaRecordsIterableIT.java    | 415 +++++++++++++++++++
 .../org/apache/crunch/kafka/KafkaSourceIT.java  | 169 ++++++++
 .../org/apache/crunch/kafka/KafkaUtilsIT.java   | 188 +++++++++
 .../kafka/inputformat/KafkaInputFormatIT.java   | 407 ++++++++++++++++++
 .../kafka/inputformat/KafkaInputSplitTest.java  |  65 +++
 .../kafka/inputformat/KafkaRecordReaderIT.java  | 122 ++++++
 .../crunch/kafka/utils/EmbeddedZookeeper.java   | 102 +++++
 .../kafka/utils/KafkaBrokerTestHarness.java     | 369 +++++++++++++++++
 .../crunch/kafka/utils/KafkaTestUtils.java      |  94 +++++
 .../crunch/kafka/utils/ZkStringSerializer.java  |  43 ++
 .../kafka/utils/ZookeeperTestHarness.java       | 112 +++++
 .../src/test/resources/log4j.properties         |  29 ++
 pom.xml                                         |  13 +
 23 files changed, 3932 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-kafka/pom.xml b/crunch-kafka/pom.xml
new file mode 100644
index 0000000..a96a9b0
--- /dev/null
+++ b/crunch-kafka/pom.xml
@@ -0,0 +1,82 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.crunch</groupId>
+    <artifactId>crunch-parent</artifactId>
+    <version>0.14.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>crunch-kafka</artifactId>
+  <name>Apache Crunch for Kafka</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.crunch</groupId>
+      <artifactId>crunch-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.10</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.crunch</groupId>
+      <artifactId>crunch-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaData.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaData.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaData.java
new file mode 100644
index 0000000..6543aad
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaData.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.SourceTarget;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+class KafkaData<K, V> implements ReadableData<Pair<K, V>> {
+
+  private static final long serialVersionUID = -6582212311361579556L;
+
+  private final Map<TopicPartition, Pair<Long, Long>> offsets;
+  private final Properties props;
+
+  public KafkaData(Properties connectionProperties,
+                   Map<TopicPartition, Pair<Long, Long>> offsets) {
+    this.props = connectionProperties;
+    this.offsets = offsets;
+  }
+
+
+  @Override
+  public Set<SourceTarget<?>> getSourceTargets() {
+    return null;
+  }
+
+  @Override
+  public void configure(Configuration conf) {
+    //no-op
+  }
+
+  @Override
+  public Iterable<Pair<K, V>> read(TaskInputOutputContext<?, ?, ?, ?> context) throws IOException {
+    Consumer<K, V> consumer = new KafkaConsumer<K, V>(props);
+    return new KafkaRecordsIterable<>(consumer, offsets, props);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaRecordsIterable.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaRecordsIterable.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaRecordsIterable.java
new file mode 100644
index 0000000..8fec7f8
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaRecordsIterable.java
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.kafka.inputformat.KafkaRecordReader;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Properties;
+import java.util.Set;
+
+
+class KafkaRecordsIterable<K, V> implements Iterable<Pair<K, V>> {
+
+  /**
+   * Logger
+   */
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordsIterable.class);
+
+  /**
+   * The Kafka consumer responsible for retrieving messages.
+   */
+  private final Consumer<K, V> consumer;
+
+  /**
+   * The starting positions of the iterable for the topic.
+   */
+  private final Map<TopicPartition, Pair<Long, Long>> offsets;
+
+  /**
+   * Tracks if the iterable is empty.
+   */
+  private final boolean isEmpty;
+
+  /**
+   * The poll time between each request to Kafka
+   */
+  private final long scanPollTime;
+
+  private final int maxRetryAttempts;
+
+  /**
+   * Creates the iterable that will pull values for a collection of topics using the provided {@code consumer} between
+   * the {@code startOffsets} and {@code stopOffsets}.
+   * @param consumer The consumer for pulling the data from Kafka.  The consumer will be closed automatically once all
+   *                 of the records have been consumed.
+   * @param offsets offsets for pulling data
+   * @param properties properties for tweaking the behavior of the iterable.
+   * @throws IllegalArgumentException if any of the arguments are {@code null} or empty.
+   */
+  public KafkaRecordsIterable(Consumer<K, V> consumer, Map<TopicPartition, Pair<Long, Long>> offsets,
+                              Properties properties) {
+    if (consumer == null) {
+      throw new IllegalArgumentException("The 'consumer' cannot be 'null'.");
+    }
+    this.consumer = consumer;
+
+    if (properties == null) {
+      throw new IllegalArgumentException("The 'properties' cannot be 'null'.");
+    }
+
+    String retryString = properties.getProperty(KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY,
+        KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT_STRING);
+    maxRetryAttempts = Integer.parseInt(retryString);
+
+    if (offsets == null || offsets.isEmpty()) {
+      throw new IllegalArgumentException("The 'offsets' cannot 'null' or empty.");
+    }
+
+    //filter out any topics and partitions that do not have offset ranges that will produce data.
+    Map<TopicPartition, Pair<Long, Long>> filteredOffsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) {
+      Pair<Long, Long> value = entry.getValue();
+      //if start is less than one less than stop then there is data to be had
+      if(value.first() < value.second()){
+        filteredOffsets.put(entry.getKey(), value);
+      }else{
+        LOG.debug("Removing offsets for {} because start is not less than the end offset.", entry.getKey());
+      }
+    }
+
+    //check to make sure that based on the offsets there is data to retrieve, otherwise false.
+    //there will be data if the start offsets are less than stop offsets
+    isEmpty = filteredOffsets.isEmpty();
+    if (isEmpty) {
+      LOG.warn("Iterable for Kafka for is empty because offsets are empty.");
+    }
+
+    //assign this
+    this.offsets = filteredOffsets;
+
+    scanPollTime = Long.parseLong(properties.getProperty(KafkaSource.CONSUMER_POLL_TIMEOUT_KEY,
+        Long.toString(KafkaSource.CONSUMER_POLL_TIMEOUT_DEFAULT)));
+  }
+
+  @Override
+  public Iterator<Pair<K, V>> iterator() {
+    if (isEmpty) {
+      LOG.debug("Returning empty iterator since offsets align.");
+      return Collections.emptyIterator();
+    }
+    //Assign consumer to all of the partitions
+    LOG.debug("Assigning topics and partitions and seeking to start offsets.");
+
+    consumer.assign(new LinkedList<>(offsets.keySet()));
+    //hack so maybe look at removing this
+    consumer.poll(0);
+    for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) {
+      consumer.seek(entry.getKey(), entry.getValue().first());
+    }
+
+    return new RecordsIterator<K, V>(consumer, offsets, scanPollTime, maxRetryAttempts);
+  }
+
+  private static class RecordsIterator<K, V> implements Iterator<Pair<K, V>> {
+
+    private final Consumer<K, V> consumer;
+    private final Map<TopicPartition, Pair<Long, Long>> offsets;
+    private final long pollTime;
+    private final int maxNumAttempts;
+    private ConsumerRecords<K, V> records;
+    private Iterator<ConsumerRecord<K, V>> currentIterator;
+    private final Set<TopicPartition> remainingPartitions;
+
+    private Pair<K, V> next;
+
+    public RecordsIterator(Consumer<K, V> consumer,
+                           Map<TopicPartition, Pair<Long, Long>> offsets, long pollTime, int maxNumRetries) {
+      this.consumer = consumer;
+      remainingPartitions = new HashSet<>(offsets.keySet());
+      this.offsets = offsets;
+      this.pollTime = pollTime;
+      this.maxNumAttempts = maxNumRetries;
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (next != null)
+        return true;
+
+      //if partitions to consume then pull next value
+      if (remainingPartitions.size() > 0) {
+        next = getNext();
+      }
+
+      return next != null;
+    }
+
+    @Override
+    public Pair<K, V> next() {
+      if (next == null) {
+        next = getNext();
+      }
+
+      if (next != null) {
+        Pair<K, V> returnedNext = next;
+        //prime for next call
+        next = getNext();
+        //return the current next
+        return returnedNext;
+      } else {
+        throw new NoSuchElementException("No more elements.");
+      }
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("remove is not supported.");
+    }
+
+    /**
+     * Gets the current iterator.
+     *
+     * @return the current iterator or {@code null} if there are no more values to consume.
+     */
+    private Iterator<ConsumerRecord<K, V>> getIterator() {
+      if (!remainingPartitions.isEmpty()) {
+        if (currentIterator != null && currentIterator.hasNext()) {
+          return currentIterator;
+        }
+        LOG.debug("Retrieving next set of records.");
+        int numTries = 0;
+        boolean notSuccess = false;
+        while(!notSuccess && numTries < maxNumAttempts) {
+          try {
+            records = consumer.poll(pollTime);
+            notSuccess = true;
+          }catch(RetriableException re){
+            numTries++;
+            if(numTries < maxNumAttempts) {
+              LOG.warn("Error pulling messages from Kafka. Retrying with attempt {}", numTries, re);
+            }else{
+              LOG.error("Error pulling messages from Kafka. Exceeded maximum number of attempts {}", maxNumAttempts, re);
+              throw re;
+            }
+          }
+        }
+        if (records == null || records.isEmpty()) {
+          LOG.debug("Retrieved empty records.");
+          currentIterator = null;
+          return null;
+        }
+        currentIterator = records.iterator();
+        return currentIterator;
+      }
+
+      LOG.debug("No more partitions to consume therefore not retrieving any more records.");
+      return null;
+    }
+
+    /**
+     * Internal method for retrieving the next value to retrieve.
+     *
+     * @return {@code null} if there are no more values to retrieve otherwise the next event.
+     */
+    private Pair<K, V> getNext() {
+      while (!remainingPartitions.isEmpty()) {
+        Iterator<ConsumerRecord<K, V>> iterator = getIterator();
+
+        while (iterator != null && iterator.hasNext()) {
+          ConsumerRecord<K, V> record = iterator.next();
+          TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
+          long offset = record.offset();
+
+          if (withinRange(topicPartition, offset)) {
+            LOG.debug("Retrieving value for {} with offset {}.", topicPartition, offset);
+            return Pair.of(record.key(), record.value());
+          }
+          LOG.debug("Value for {} with offset {} is outside of range skipping.", topicPartition, offset);
+        }
+      }
+
+      LOG.debug("Closing the consumer because there are no more remaining partitions.");
+      consumer.close();
+
+      LOG.debug("Consumed data from all partitions.");
+      return null;
+
+    }
+
+    /**
+     * Checks whether the value for {@code topicPartition} with an {@code offset} is within scan range.  If
+     * the value is not then {@code false} is returned otherwise {@code true}.
+     *
+     * @param topicPartion The partition for the offset
+     * @param offset the offset in the partition
+     * @return {@code true} if the value is within the expected consumption range, otherwise {@code false}.
+     */
+    private boolean withinRange(TopicPartition topicPartion, long offset) {
+      long endOffset = offsets.get(topicPartion).second();
+      //end offsets are one higher than the last written value.
+      boolean emit = offset < endOffset;
+      if (offset >= endOffset - 1) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Completed consuming partition {} with offset {} and ending offset {}.",
+              new Object[]{topicPartion, offset, endOffset});
+        }
+        remainingPartitions.remove(topicPartion);
+        consumer.pause(topicPartion);
+      }
+      LOG.debug("Value for partition {} and offset {} is within range.", topicPartion, offset);
+      return emit;
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
new file mode 100644
index 0000000..485604d
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.Source;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.impl.mr.run.CrunchMapper;
+import org.apache.crunch.io.CrunchInputs;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.kafka.inputformat.KafkaInputFormat;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A Crunch Source that will retrieve events from Kafka given start and end offsets.  The source is not designed to
+ * process unbounded data but instead to retrieve data between a specified range.
+ * <p>
+ *
+ * The values retrieved from Kafka are returned as raw bytes inside of a {@link BytesWritable}.  If callers
+ * need specific parsing logic based on the topic then consumers are encouraged to use multiple Kafka Sources
+ * for each topic and use special {@link DoFn} to parse the payload.
+ */
+public class KafkaSource
+    implements TableSource<BytesWritable, BytesWritable>, ReadableSource<Pair<BytesWritable, BytesWritable>> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
+
+  private final FormatBundle inputBundle;
+  private final Properties props;
+  private final Map<TopicPartition, Pair<Long, Long>> offsets;
+
+  /**
+   * The consistent PType describing all of the data being retrieved from Kafka as a BytesWritable.
+   */
+  private static PTableType<BytesWritable, BytesWritable> KAFKA_SOURCE_TYPE =
+      Writables.tableOf(Writables.writables(BytesWritable.class), Writables.writables(BytesWritable.class));
+
+  /**
+   * Constant to indicate how long the reader waits before timing out when retrieving data from Kafka.
+   */
+  public static final String CONSUMER_POLL_TIMEOUT_KEY = "org.apache.crunch.kafka.consumer.poll.timeout";
+
+  /**
+   * Default timeout value for {@link #CONSUMER_POLL_TIMEOUT_KEY} of 1 second.
+   */
+  public static final long CONSUMER_POLL_TIMEOUT_DEFAULT = 1000L;
+
+
+  /**
+   * Constructs a Kafka source that will read data from the Kafka cluster identified by the {@code kafkaConnectionProperties}
+   * and from the specific topics and partitions identified in the {@code offsets}
+   * @param kafkaConnectionProperties The connection properties for reading from Kafka.  These properties will be honored
+   *                                  with the exception of the {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and
+   *                                  {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+   * @param offsets A map of {@link TopicPartition} to a pair of start and end offsets respectively.  The start and end offsets
+   *                are evaluated at [start, end) where the ending offset is excluded.  Each TopicPartition must have a
+   *                non-null pair describing its offsets.  The start offset should be less than the end offset.  If the values
+   *                are equal or start is greater than the end then that partition will be skipped.
+   */
+  public KafkaSource(Properties kafkaConnectionProperties, Map<TopicPartition, Pair<Long, Long>> offsets) {
+    this.props = copyAndSetProperties(kafkaConnectionProperties);
+
+    inputBundle = createFormatBundle(props, offsets);
+
+    this.offsets = Collections.unmodifiableMap(new HashMap<>(offsets));
+  }
+
+  @Override
+  public Source<Pair<BytesWritable, BytesWritable>> inputConf(String key, String value) {
+    inputBundle.set(key, value);
+    return this;
+  }
+
+  @Override
+  public PType<Pair<BytesWritable, BytesWritable>> getType() {
+    return KAFKA_SOURCE_TYPE;
+  }
+
+  @Override
+  public Converter<?, ?, ?, ?> getConverter() {
+    return KAFKA_SOURCE_TYPE.getConverter();
+  }
+
+  @Override
+  public PTableType<BytesWritable, BytesWritable> getTableType() {
+    return KAFKA_SOURCE_TYPE;
+  }
+
+  @Override
+  public long getSize(Configuration configuration) {
+    // TODO something smarter here.
+    return 1000L * 1000L * 1000L;
+  }
+
+  @Override
+  public String toString() {
+    return "KafkaSource("+props.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)+")";
+  }
+
+  @Override
+  public long getLastModifiedAt(Configuration configuration) {
+    LOG.warn("Cannot determine last modified time for source: {}", toString());
+    return -1;
+  }
+
+  private static <K, V> FormatBundle createFormatBundle(Properties kafkaConnectionProperties,
+                                                        Map<TopicPartition, Pair<Long, Long>> offsets) {
+
+    FormatBundle<KafkaInputFormat> bundle = FormatBundle.forInput(KafkaInputFormat.class);
+
+    KafkaInputFormat.writeOffsetsToBundle(offsets, bundle);
+
+    for (String name : kafkaConnectionProperties.stringPropertyNames()) {
+      bundle.set(name, kafkaConnectionProperties.getProperty(name));
+    }
+
+    return bundle;
+  }
+
+  private static <K, V> Properties copyAndSetProperties(Properties kakfaConnectionProperties) {
+    Properties props = new Properties();
+    props.putAll(kakfaConnectionProperties);
+
+    //Setting the key/value deserializer to ensure proper translation from Kafka to PType format.
+    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
+    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
+
+    return props;
+  }
+
+
+  @Override
+  public Iterable<Pair<BytesWritable, BytesWritable>> read(Configuration conf) throws IOException {
+    // consumer will get closed when the iterable is fully consumed.
+    // skip using the inputformat/splits since this will be read in a single JVM and don't need the complexity
+    // of parallelism when reading.
+    Consumer<BytesWritable, BytesWritable> consumer = new KafkaConsumer<>(props);
+    return new KafkaRecordsIterable<>(consumer, offsets, props);
+  }
+
+
+  @Override
+  public void configureSource(Job job, int inputId) throws IOException {
+    Configuration conf = job.getConfiguration();
+    //an id of -1 indicates that this is the only input so just use it directly
+    if (inputId == -1) {
+      job.setMapperClass(CrunchMapper.class);
+      job.setInputFormatClass(inputBundle.getFormatClass());
+      inputBundle.configure(conf);
+    } else {
+      //there are multiple inputs for this mapper so add it as a CrunchInputs and need a fake path just to
+      //make it play well with other file based inputs.
+      Path dummy = new Path("/kafka/" + inputId);
+      CrunchInputs.addInputPath(job, dummy, inputBundle, inputId);
+    }
+  }
+
+  @Override
+  public ReadableData<Pair<BytesWritable, BytesWritable>> asReadable() {
+    // skip using the inputformat/splits since this will be read in a single JVM and don't need the complexity
+    // of parallelism when reading.
+    return new KafkaData<>(props, offsets);
+  }
+
+
+  /**
+   * Basic {@link Deserializer} which simply wraps the payload as a {@link BytesWritable}.
+   */
+  public static class BytesDeserializer implements Deserializer<BytesWritable> {
+
+    @Override
+    public void configure(Map<String, ?> configProperties, boolean isKey) {
+      //no-op
+    }
+
+    @Override
+    public BytesWritable deserialize(String topic, byte[] valueBytes) {
+      return new BytesWritable(valueBytes);
+    }
+
+    @Override
+    public void close() {
+      //no-op
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
new file mode 100644
index 0000000..aeea6fb
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.cluster.Broker;
+import kafka.cluster.BrokerEndPoint;
+import kafka.cluster.EndPoint;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetRequest;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.javaapi.TopicMetadataResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+/**
+ * Simple utilities for retrieving offset and Kafka information to assist in setting up and configuring a
+ * {@link KafkaSource} instance.
+ */
+public class KafkaUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
+
+  private static final String CLIENT_ID = "crunch-kafka-client";
+
+  private static final Random RANDOM = new Random();
+
+  /**
+   * Configuration property for the number of retry attempts that will be made to Kafka.
+   */
+  public static final String KAFKA_RETRY_ATTEMPTS_KEY = "org.apache.crunch.kafka.retry.attempts";
+
+  /**
+   * Default number of retry attempts.
+   */
+  public static final int KAFKA_RETRY_ATTEMPTS_DEFAULT = 5;
+  public static final String KAFKA_RETRY_ATTEMPTS_DEFAULT_STRING = Integer.toString(KAFKA_RETRY_ATTEMPTS_DEFAULT);
+
+  /**
+   * Converts the provided {@code config} into a {@link Properties} object to connect with Kafka.
+   * @param config the config to read properties
+   * @return a properties instance populated with all of the values inside the provided {@code config}.
+   */
+  public static Properties getKafkaConnectionProperties(Configuration config) {
+    Properties props = new Properties();
+    for (Map.Entry<String, String> value : config) {
+      props.setProperty(value.getKey(), value.getValue());
+    }
+
+    return props;
+  }
+
+  /**
+   * Adds the {@code properties} to the provided {@code config} instance.
+   * @param properties the properties to add to the config.
+   * @param config the configuration instance to be modified.
+   * @return the config instance with the populated properties
+   */
+  public static Configuration addKafkaConnectionProperties(Properties properties, Configuration config) {
+    for (String name : properties.stringPropertyNames()) {
+      config.set(name, properties.getProperty(name));
+    }
+    return config;
+  }
+
+  /**
+   * Returns a {@link TopicMetadataRequest} from the given topics
+   *
+   * @param topics an array of topics you want metadata for
+   * @return a {@link TopicMetadataRequest} from the given topics
+   * @throws IllegalArgumentException if topics is {@code null} or empty, or if any of the topics is null, empty or blank
+   */
+  private static TopicMetadataRequest getTopicMetadataRequest(String... topics) {
+    if (topics == null)
+      throw new IllegalArgumentException("topics cannot be null");
+    if (topics.length == 0)
+      throw new IllegalArgumentException("topics cannot be empty");
+
+    for (String topic : topics)
+      if (StringUtils.isBlank(topic))
+        throw new IllegalArgumentException("No topic can be null, empty or blank");
+
+    return new TopicMetadataRequest(Arrays.asList(topics));
+  }
+
+  /**
+   * <p>
+   * Retrieves the offset values for an array of topics at the specified time.
+   * </p>
+   * <p>
+   * If the Kafka cluster does not have the logs for the partition at the specified time or if the topic did not exist
+   * at that time this will instead return the earliest offset for that partition.
+   * </p>
+   *
+   * @param properties the properties containing the configuration for kafka
+   * @param time       the time at which we want to know what the offset values were
+   * @param topics     the topics we want to know the offset values of
+   * @return the offset values for an array of topics at the specified time
+   * @throws IllegalArgumentException if properties is {@code null} or if topics is {@code null} or empty or if any of
+   *                                  the topics are {@code null}, empty or blank, or if there is an error parsing the
+   *                                  properties.
+   * @throws IllegalStateException if there is an error communicating with the Kafka cluster to retrieve information.
+   */
+  public static Map<TopicPartition, Long> getBrokerOffsets(Properties properties, long time, String... topics) {
+    if (properties == null)
+      throw new IllegalArgumentException("properties cannot be null");
+
+    final List<Broker> brokers = getBrokers(properties);
+    Collections.shuffle(brokers, RANDOM);
+
+    return getBrokerOffsets(brokers, time, topics);
+  }
+
+  // Visible for testing
+  static Map<TopicPartition, Long> getBrokerOffsets(List<Broker> brokers, long time, String... topics) {
+    if (topics == null)
+      throw new IllegalArgumentException("topics cannot be null");
+    if (topics.length == 0)
+      throw new IllegalArgumentException("topics cannot be empty");
+
+    for (String topic : topics)
+      if (StringUtils.isBlank(topic))
+        throw new IllegalArgumentException("No topic can be null, empty or blank");
+
+    TopicMetadataResponse topicMetadataResponse = null;
+
+    final TopicMetadataRequest topicMetadataRequest = getTopicMetadataRequest(topics);
+
+    for (final Broker broker : brokers) {
+      final SimpleConsumer consumer = getSimpleConsumer(broker);
+      try {
+        topicMetadataResponse = consumer.send(topicMetadataRequest);
+        break;
+      } catch (Exception err) {
+        EndPoint endpoint = broker.endPoints().get(SecurityProtocol.PLAINTEXT).get();
+        LOG.warn(String.format("Fetching topic metadata for topic(s) '%s' from broker '%s' failed",
+            Arrays.toString(topics), endpoint.host()), err);
+      } finally {
+        consumer.close();
+      }
+    }
+
+    if (topicMetadataResponse == null) {
+      throw new IllegalStateException(String.format("Fetching topic metadata for topic(s) '%s' from broker(s) '%s' failed",
+          Arrays.toString(topics), Arrays.toString(brokers.toArray())));
+    }
+
+    // From the topic metadata, build a PartitionOffsetRequestInfo for each partition of each topic. It should be noted that
+    // only the leader Broker has the partition offset information[1] so save the leader Broker so we
+    // can send the request to it.
+    // [1] - https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html#AGuideToTheKafkaProtocol-OffsetAPI
+    Map<Broker, Map<TopicAndPartition, PartitionOffsetRequestInfo>> brokerRequests =
+        new HashMap<>();
+
+    for (TopicMetadata metadata : topicMetadataResponse.topicsMetadata()) {
+      for (PartitionMetadata partition : metadata.partitionsMetadata()) {
+        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
+            new HashMap<>();
+
+        BrokerEndPoint brokerEndPoint = partition.leader();
+        Broker leader = new Broker(0, brokerEndPoint.host(), brokerEndPoint.port(), SecurityProtocol.PLAINTEXT);
+
+        if (brokerRequests.containsKey(leader))
+          requestInfo = brokerRequests.get(leader);
+
+        requestInfo.put(new TopicAndPartition(metadata.topic(), partition.partitionId()), new PartitionOffsetRequestInfo(
+            time, 1));
+
+        brokerRequests.put(leader, requestInfo);
+      }
+    }
+
+    Map<TopicPartition, Long> topicPartitionToOffset = new HashMap<>();
+
+    // Send the offset request to the leader broker
+    for (Map.Entry<Broker, Map<TopicAndPartition, PartitionOffsetRequestInfo>> brokerRequest : brokerRequests.entrySet()) {
+      SimpleConsumer simpleConsumer = getSimpleConsumer(brokerRequest.getKey());
+
+      OffsetResponse offsetResponse = null;
+      try {
+        OffsetRequest offsetRequest = new OffsetRequest(brokerRequest.getValue(), kafka.api.OffsetRequest.CurrentVersion(),
+            CLIENT_ID);
+        offsetResponse = simpleConsumer.getOffsetsBefore(offsetRequest);
+      } finally {
+        simpleConsumer.close();
+      }
+
+      Map<TopicPartition, Long> earliestOffsets = null;
+
+      // Retrieve/parse the results
+      for (Map.Entry<TopicAndPartition, PartitionOffsetRequestInfo> entry : brokerRequest.getValue().entrySet()) {
+        TopicAndPartition topicAndPartition = entry.getKey();
+        TopicPartition topicPartition = new TopicPartition(topicAndPartition.topic(), topicAndPartition.partition());
+        long[] offsets = offsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition());
+        long offset;
+
+        // The Kafka API will return no value if a time is given which there is no log that contains messages from that time
+        // (i.e. before a topic existed or in a log that was rolled/cleaned)
+        if (offsets.length > 0) {
+          offset = offsets[0];
+        } else {
+          LOG.info("Kafka did not have an offset for topic/partition [{}]. Returning earliest known offset instead",
+              topicAndPartition);
+
+          // This shouldn't happen but if kafka's API did not provide us with a value and we are asking for the earliest
+          // time we can't be sure what to do so quit
+          if (time == kafka.api.OffsetRequest.EarliestTime())
+            throw new IllegalStateException("We requested the earliest offsets for topic [" + topicAndPartition.topic()
+                + "] but Kafka returned no values");
+
+          // Load the earliest offsets for the topic if it hasn't been loaded already
+          if (earliestOffsets == null)
+            earliestOffsets = getBrokerOffsets(Arrays.asList(brokerRequest.getKey()),
+                kafka.api.OffsetRequest.EarliestTime(), topicAndPartition.topic());
+
+          offset = earliestOffsets.get(topicPartition);
+        }
+
+        topicPartitionToOffset.put(topicPartition, offset);
+      }
+    }
+
+    return topicPartitionToOffset;
+  }
+
+  /**
+   * Returns a {@link SimpleConsumer} connected to the given {@link Broker}
+   */
+  private static SimpleConsumer getSimpleConsumer(final Broker broker) {
+    // BrokerHost, BrokerPort, timeout, buffer size, client id
+    EndPoint endpoint = broker.endPoints().get(SecurityProtocol.PLAINTEXT).get();
+    return new SimpleConsumer(endpoint.host(), endpoint.port(), 100000, 64 * 1024, CLIENT_ID);
+  }
+
+  /**
+   * Returns a {@link Broker} list from the given {@link Properties}
+   *
+   * @param properties the {@link Properties} with configuration to connect to a Kafka broker
+   */
+  private static List<Broker> getBrokers(final Properties properties) {
+    if (properties == null)
+      throw new IllegalArgumentException("props cannot be null");
+
+    String commaDelimitedBrokerList = properties.getProperty("metadata.broker.list");
+    if (commaDelimitedBrokerList == null)
+      throw new IllegalArgumentException("Unable to find 'metadata.broker.list' in given properties");
+
+    // Split broker list into host/port pairs
+    String[] brokerPortList = commaDelimitedBrokerList.split(",");
+    if (brokerPortList.length < 1)
+      throw new IllegalArgumentException("Unable to parse broker list : [" + Arrays.toString(brokerPortList) + "]");
+
+    final List<Broker> brokers = new ArrayList<Broker>(brokerPortList.length);
+    for (final String brokerHostPortString : brokerPortList) {
+      // Split host/port
+      String[] brokerHostPort = brokerHostPortString.split(":");
+      if (brokerHostPort.length != 2)
+        throw new IllegalArgumentException("Unable to parse host/port from broker string : ["
+            + Arrays.toString(brokerHostPort) + "] from broker list : [" + Arrays.toString(brokerPortList) + "]");
+      try {
+        brokers.add(new Broker(0, brokerHostPort[0], Integer.parseInt(brokerHostPort[1]), SecurityProtocol.PLAINTEXT));
+      } catch (NumberFormatException e) {
+        throw new IllegalArgumentException("Error parsing broker port : " + brokerHostPort[1], e);
+      }
+    }
+    return brokers;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java
new file mode 100644
index 0000000..eba4a97
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.inputformat;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.crunch.Pair;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Basic input format for reading data from Kafka.  Data is read and maintained in its pure byte form and wrapped
+ * inside of a {@link BytesWritable} instance.
+ *
+ * Populating the configuration of the input format is handled with the convenience method of
+ * {@link #writeOffsetsToConfiguration(Map, Configuration)}.  This should be done to ensure
+ * the Kafka offset information is available when the input format {@link #getSplits(JobContext) creates its splits}
+ * and {@link #createRecordReader(InputSplit, TaskAttemptContext) readers}.
+ */
+public class KafkaInputFormat extends InputFormat<BytesWritable, BytesWritable> implements Configurable {
+
+  /**
+   * Constant for constructing configuration keys for the input format.
+   */
+  private static final String KAFKA_INPUT_OFFSETS_BASE = "org.apache.crunch.kafka.offsets.topic";
+
+  /**
+   * Constant used for building configuration keys and specifying partitions.
+   */
+  private static final String PARTITIONS = "partitions";
+
+  /**
+   * Constant used for building configuration keys and specifying the start of a partition.
+   */
+  private static final String START = "start";
+
+  /**
+   * Constant used for building configuration keys and specifying the end of a partition.
+   */
+  private static final String END = "end";
+
+  /**
+   * Regex to discover all of the defined partitions which should be consumed by the input format.
+   */
+  private static final String TOPIC_KEY_REGEX = KAFKA_INPUT_OFFSETS_BASE + "\\..*\\." + PARTITIONS + "$";
+
+  private Configuration configuration;
+
+  @Override
+  public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
+    Map<TopicPartition, Pair<Long, Long>> offsets = getOffsets(getConf());
+    List<InputSplit> splits = new LinkedList<>();
+    for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) {
+      TopicPartition topicPartition = entry.getKey();
+
+      long start = entry.getValue().first();
+      long end = entry.getValue().second();
+      if(start != end) {
+        splits.add(new KafkaInputSplit(topicPartition.topic(), topicPartition.partition(), entry.getValue().first(),
+            entry.getValue().second()));
+      }
+    }
+
+    return splits;
+  }
+
+  @Override
+  public RecordReader<BytesWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+      throws IOException, InterruptedException {
+    return new KafkaRecordReader<>();
+  }
+
+  @Override
+  public void setConf(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return configuration;
+  }
+
+
+  //The following methods are used for reading and writing Kafka Partition offset information into Hadoop's Configuration
+  //objects and into Crunch's FormatBundle.  For a specific Kafka Topic it might have one or many partitions and for
+  //each partition it will need a start and end offset.  Assuming you have a topic of "abc" and it has 2 partitions the
+  //configuration would be populated with the following:
+  // org.apache.crunch.kafka.offsets.topic.abc.partitions = [0,1]
+  // org.apache.crunch.kafka.offsets.topic.abc.partitions.0.start = <partition start>
+  // org.apache.crunch.kafka.offsets.topic.abc.partitions.0.end = <partition end>
+  // org.apache.crunch.kafka.offsets.topic.abc.partitions.1.start = <partition start>
+  // org.apache.crunch.kafka.offsets.topic.abc.partitions.1.end = <partition end>
+
+  /**
+   * Writes the start and end offsets for the provided topic partitions to the {@code bundle}.
+   * @param offsets The starting and ending offsets for the topics and partitions.
+   * @param bundle the bundle into which the information should be persisted.
+   */
+  public static void writeOffsetsToBundle(Map<TopicPartition, Pair<Long, Long>> offsets, FormatBundle bundle) {
+    for (Map.Entry<String, String> entry : generateValues(offsets).entrySet()) {
+      bundle.set(entry.getKey(), entry.getValue());
+    }
+  }
+
+  /**
+   * Writes the start and end offsets for the provided topic partitions to the {@code config}.
+   * @param offsets The starting and ending offsets for the topics and partitions.
+   * @param config the config into which the information should be persisted.
+   */
+  public static void writeOffsetsToConfiguration(Map<TopicPartition, Pair<Long, Long>> offsets, Configuration config) {
+    for (Map.Entry<String, String> entry : generateValues(offsets).entrySet()) {
+      config.set(entry.getKey(), entry.getValue());
+    }
+  }
+
+  /**
+   * Reads the {@code configuration} to determine which topics, partitions, and offsets should be used for reading data.
+   *
+   * @param configuration the configuration to derive the data to read.
+   * @return a map of {@link TopicPartition} to a pair of start and end offsets.
+   * @throws IllegalStateException if the {@code configuration} does not have the start and end offsets set properly
+   * for a partition.
+   */
+  public static Map<TopicPartition, Pair<Long, Long>> getOffsets(Configuration configuration) {
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    //find configuration for all of the topics with defined partitions
+    Map<String, String> topicPartitionKeys = configuration.getValByRegex(TOPIC_KEY_REGEX);
+
+    //for each topic start to process it's partitions
+    for (String key : topicPartitionKeys.keySet()) {
+      String topic = getTopicFromKey(key);
+      int[] partitions = configuration.getInts(key);
+      //for each partition find and add the start/end offset
+      for (int partitionId : partitions) {
+        TopicPartition topicPartition = new TopicPartition(topic, partitionId);
+        long start = configuration.getLong(generatePartitionStartKey(topic, partitionId),Long.MIN_VALUE);
+        long end = configuration.getLong(generatePartitionEndKey(topic, partitionId),
+            Long.MIN_VALUE);
+
+        if(start == Long.MIN_VALUE || end == Long.MIN_VALUE){
+          throw new IllegalStateException("The "+topicPartition+ "has an invalid start:"+start+ " or end:"+end
+              +" offset configured.");
+        }
+
+        offsets.put(topicPartition, Pair.of(start, end));
+      }
+    }
+
+    return offsets;
+  }
+
+  private static Map<String, String> generateValues(Map<TopicPartition, Pair<Long, Long>> offsets) {
+    Map<String, String> offsetConfigValues = new HashMap<>();
+    Map<String, Set<Integer>> topicsPartitions = new HashMap<>();
+
+    for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) {
+      TopicPartition topicPartition = entry.getKey();
+      String topic = topicPartition.topic();
+      int partition = topicPartition.partition();
+      String startKey = generatePartitionStartKey(topic, partition);
+      String endKey = generatePartitionEndKey(topic, partition);
+      //Add the start and end offsets for a specific partition
+      offsetConfigValues.put(startKey, Long.toString(entry.getValue().first()));
+      offsetConfigValues.put(endKey, Long.toString(entry.getValue().second()));
+
+      Set<Integer> partitions = topicsPartitions.get(topic);
+      if (partitions == null) {
+        partitions = new HashSet<>();
+        topicsPartitions.put(topic, partitions);
+      }
+      partitions.add(partition);
+    }
+
+    //generate the partitions values for each topic
+    for (Map.Entry<String, Set<Integer>> entry : topicsPartitions.entrySet()) {
+      String key = KAFKA_INPUT_OFFSETS_BASE + "." + entry.getKey() + "." + PARTITIONS;
+      Set<Integer> partitions = entry.getValue();
+      String partitionsString = StringUtils.join(partitions, ",");
+      offsetConfigValues.put(key, partitionsString);
+    }
+
+    return offsetConfigValues;
+  }
+
+  static String generatePartitionStartKey(String topic, int partition) {
+    return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS + "." + partition + "." + START;
+  }
+
+  static String generatePartitionEndKey(String topic, int partition) {
+    return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS + "." + partition + "." + END;
+  }
+
+  static String generateTopicPartitionsKey(String topic) {
+    return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS;
+  }
+
+  static String getTopicFromKey(String key) {
+    //strip off the base key + a trailing "."
+    String value = key.substring(KAFKA_INPUT_OFFSETS_BASE.length() + 1);
+    //strip off the end part + a preceding "."
+    value = value.substring(0, (value.length() - (PARTITIONS.length() + 1)));
+
+    return value;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputSplit.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputSplit.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputSplit.java
new file mode 100644
index 0000000..c8ebc6a
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputSplit.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.inputformat;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * InputSplit that represent retrieving data from a single {@link TopicPartition} between the specified start
+ * and end offsets.
+ */
+public class KafkaInputSplit extends InputSplit implements Writable {
+
+  private long startingOffset;
+  private long endingOffset;
+  private TopicPartition topicPartition;
+
+  /**
+   * Nullary Constructor for creating the instance inside the Mapper instance.
+   */
+  public KafkaInputSplit() {
+
+  }
+
+  /**
+   * Constructs an input split for the provided {@code topic} and {@code partition} restricting data to be between
+   * the {@code startingOffset} and {@code endingOffset}
+   * @param topic the topic for the split
+   * @param partition the partition for the topic
+   * @param startingOffset the start of the split
+   * @param endingOffset the end of the split
+   */
+  public KafkaInputSplit(String topic, int partition, long startingOffset, long endingOffset) {
+    this.startingOffset = startingOffset;
+    this.endingOffset = endingOffset;
+    topicPartition = new TopicPartition(topic, partition);
+  }
+
+  @Override
+  public long getLength() throws IOException, InterruptedException {
+    // This is just used as a hint for size of bytes so it is already inaccurate.
+    return startingOffset > 0 ? endingOffset - startingOffset : endingOffset;
+  }
+
+  @Override
+  public String[] getLocations() throws IOException, InterruptedException {
+    //Leave empty since data locality not really an issue.
+    return new String[0];
+  }
+
+  /**
+   * Returns the topic and partition for the split
+   * @return the topic and partition for the split
+   */
+  public TopicPartition getTopicPartition() {
+    return topicPartition;
+  }
+
+  /**
+   * Returns the starting offset for the split
+   * @return the starting offset for the split
+   */
+  public long getStartingOffset() {
+    return startingOffset;
+  }
+
+  /**
+   * Returns the ending offset for the split
+   * @return the ending offset for the split
+   */
+  public long getEndingOffset() {
+    return endingOffset;
+  }
+
+  @Override
+  public void write(DataOutput dataOutput) throws IOException {
+    dataOutput.writeUTF(topicPartition.topic());
+    dataOutput.writeInt(topicPartition.partition());
+    dataOutput.writeLong(startingOffset);
+    dataOutput.writeLong(endingOffset);
+  }
+
+  @Override
+  public void readFields(DataInput dataInput) throws IOException {
+    String topic = dataInput.readUTF();
+    int partition = dataInput.readInt();
+    startingOffset = dataInput.readLong();
+    endingOffset = dataInput.readLong();
+
+    topicPartition = new TopicPartition(topic, partition);
+  }
+
+  @Override
+  public String toString() {
+    return getTopicPartition() + " Start: " + startingOffset + " End: " + endingOffset;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
new file mode 100644
index 0000000..1420519
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.inputformat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT;
+import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY;
+import static org.apache.crunch.kafka.KafkaUtils.getKafkaConnectionProperties;
+import static org.apache.crunch.kafka.KafkaSource.CONSUMER_POLL_TIMEOUT_DEFAULT;
+import static org.apache.crunch.kafka.KafkaSource.CONSUMER_POLL_TIMEOUT_KEY;
+
+/**
+ * A {@link RecordReader} for pulling data from Kafka.
+ * @param <K> the key of the records from Kafka
+ * @param <V> the value of the records from Kafka
+ */
+public class KafkaRecordReader<K, V> extends RecordReader<K, V> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordReader.class);
+
+  private Consumer<K, V> consumer;
+  private ConsumerRecord<K, V> record;
+  private long endingOffset;
+  private Iterator<ConsumerRecord<K, V>> recordIterator;
+  private long consumerPollTimeout;
+  private long maxNumberOfRecords;
+  private long startingOffset;
+  private int maxNumberAttempts;
+
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+    consumer = new KafkaConsumer<>(getKafkaConnectionProperties(taskAttemptContext.getConfiguration()));
+    KafkaInputSplit split = (KafkaInputSplit) inputSplit;
+    TopicPartition topicPartition = split.getTopicPartition();
+    consumer.assign(Collections.singletonList(topicPartition));
+    //suggested hack to gather info without gathering data
+    consumer.poll(0);
+    //now seek to the desired start location
+    startingOffset = split.getStartingOffset();
+    consumer.seek(topicPartition,startingOffset);
+
+    endingOffset = split.getEndingOffset();
+
+    maxNumberOfRecords = endingOffset - split.getStartingOffset();
+    if(LOG.isInfoEnabled()) {
+      LOG.info("Reading data from {} between {} and {}", new Object[]{topicPartition, startingOffset, endingOffset});
+    }
+
+    Configuration config = taskAttemptContext.getConfiguration();
+    consumerPollTimeout = config.getLong(CONSUMER_POLL_TIMEOUT_KEY, CONSUMER_POLL_TIMEOUT_DEFAULT);
+    maxNumberAttempts = config.getInt(KAFKA_RETRY_ATTEMPTS_KEY, KAFKA_RETRY_ATTEMPTS_DEFAULT);
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    recordIterator = getRecords();
+    record = recordIterator.hasNext() ? recordIterator.next() : null;
+    if(LOG.isDebugEnabled()){
+      if(record != null) {
+        LOG.debug("nextKeyValue: Retrieved record with offset {}", record.offset());
+      }else{
+        LOG.debug("nextKeyValue: Retrieved null record");
+      }
+    }
+    return record != null && record.offset() < endingOffset;
+  }
+
+  @Override
+  public K getCurrentKey() throws IOException, InterruptedException {
+    return record == null ? null : record.key();
+  }
+
+  @Override
+  public V getCurrentValue() throws IOException, InterruptedException {
+    return record == null ? null : record.value();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    //not most accurate but gives reasonable estimate
+    return record == null ? 0.0f : ((float) (record.offset()- startingOffset)) / maxNumberOfRecords;
+  }
+
+  private Iterator<ConsumerRecord<K, V>> getRecords() {
+    if (recordIterator == null || !recordIterator.hasNext()) {
+      ConsumerRecords<K, V> records = null;
+      int numTries = 0;
+      boolean notSuccess = false;
+      while(!notSuccess && numTries < maxNumberAttempts) {
+        try {
+          records = consumer.poll(consumerPollTimeout);
+          notSuccess = true;
+        } catch (RetriableException re) {
+          numTries++;
+          if (numTries < maxNumberAttempts) {
+            LOG.warn("Error pulling messages from Kafka. Retrying with attempt {}", numTries, re);
+          } else {
+            LOG.error("Error pulling messages from Kafka. Exceeded maximum number of attempts {}", maxNumberAttempts, re);
+            throw re;
+          }
+        }
+      }
+
+      if(LOG.isDebugEnabled() && records != null){
+        LOG.debug("No records retrieved from Kafka therefore nothing to iterate over.");
+      }else{
+        LOG.debug("Retrieved records from Kafka to iterate over.");
+      }
+      return records != null ? records.iterator() : ConsumerRecords.<K, V>empty().iterator();
+    }
+    return recordIterator;
+  }
+
+  @Override
+  public void close() throws IOException {
+    LOG.debug("Closing the record reader.");
+    if(consumer != null) {
+      consumer.close();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
new file mode 100644
index 0000000..836039c
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import kafka.serializer.Decoder;
+import kafka.serializer.Encoder;
+import kafka.utils.VerifiableProperties;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.crunch.kafka.inputformat.KafkaInputFormatIT;
+import org.apache.crunch.kafka.inputformat.KafkaRecordReaderIT;
+import org.apache.crunch.kafka.utils.KafkaBrokerTestHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+    //org.apache.crunch.kafka
+    KafkaSourceIT.class, KafkaRecordsIterableIT.class, KafkaDataIT.class,
+    //org.apache.crunch.kafka.inputformat
+    KafkaRecordReaderIT.class, KafkaInputFormatIT.class, KafkaUtilsIT.class,
+})
+public class ClusterTest {
+
+
+  private static TemporaryFolder folder = new TemporaryFolder();
+  private static KafkaBrokerTestHarness kafka;
+  private static boolean runAsSuite = false;
+  private static Configuration conf;
+  private static FileSystem fs;
+
+  @BeforeClass
+  public static void startSuite() throws Exception {
+    runAsSuite = true;
+    startKafka();
+    setupFileSystem();
+  }
+
+  @AfterClass
+  public static void endSuite() throws Exception {
+    stopKafka();
+  }
+
+  public static void startTest() throws Exception {
+    if (!runAsSuite) {
+      startKafka();
+      setupFileSystem();
+    }
+  }
+
+  public static void endTest() throws Exception {
+    if (!runAsSuite) {
+      stopKafka();
+    }
+  }
+
+  private static void stopKafka() throws IOException {
+    kafka.tearDown();
+  }
+
+  private static void startKafka() throws IOException {
+    Properties props = new Properties();
+    props.setProperty("auto.create.topics.enable", Boolean.TRUE.toString());
+
+    kafka = new KafkaBrokerTestHarness(props);
+    kafka.setUp();
+  }
+
+  private static void setupFileSystem() throws IOException {
+    folder.create();
+
+    conf = new Configuration();
+    conf.set(RuntimeParameters.TMP_DIR, folder.getRoot().getAbsolutePath());
+    // Run Map/Reduce tests in process.
+    conf.set("mapreduce.jobtracker.address", "local");
+  }
+
+  public static Configuration getConf() {
+    // Clone the configuration so it doesn't get modified for other tests.
+    return new Configuration(conf);
+  }
+
+  public static Properties getConsumerProperties() {
+    Properties props = new Properties();
+    props.putAll(kafka.getProps());
+    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerDe.class.getName());
+    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerDe.class.getName());
+    //set this because still needed by some APIs.
+    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty("metadata.broker.list"));
+    props.setProperty("enable.auto.commit", Boolean.toString(false));
+
+    //when set this causes some problems with initializing the consumer.
+    props.remove(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+    return props;
+  }
+
+  public static Properties getProducerProperties() {
+    Properties props = new Properties();
+    props.putAll(kafka.getProps());
+    //set this because still needed by some APIs.
+    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty("metadata.broker.list"));
+    return props;
+  }
+
+  public static Configuration getConsumerConfig() {
+    Configuration kafkaConfig = new Configuration(conf);
+    KafkaUtils.addKafkaConnectionProperties(getConsumerProperties(), kafkaConfig);
+    return kafkaConfig;
+  }
+
+  public static List<String> writeData(Properties props, String topic, String batch, int loops, int numValuesPerLoop) {
+    Properties producerProps = new Properties();
+    producerProps.putAll(props);
+    producerProps.setProperty("serializer.class", StringEncoderDecoder.class.getName());
+    producerProps.setProperty("key.serializer.class", StringEncoderDecoder.class.getName());
+
+    // Set the default compression used to be snappy
+    producerProps.setProperty("compression.codec", "snappy");
+    producerProps.setProperty("request.required.acks", "1");
+
+    ProducerConfig producerConfig = new ProducerConfig(producerProps);
+
+    Producer<String, String> producer = new Producer<>(producerConfig);
+    List<String> keys = new LinkedList<>();
+    try {
+      for (int i = 0; i < loops; i++) {
+        List<KeyedMessage<String, String>> events = new LinkedList<>();
+        for (int j = 0; j < numValuesPerLoop; j++) {
+          String key = "key" + batch + i + j;
+          String value = "value" + batch + i + j;
+          keys.add(key);
+          events.add(new KeyedMessage<>(topic, key, value));
+        }
+        producer.send(events);
+      }
+    } finally {
+      producer.close();
+    }
+    return keys;
+  }
+
+
+  public static class StringSerDe implements Serializer<String>, Deserializer<String> {
+
+    @Override
+    public void configure(Map map, boolean b) {
+
+    }
+
+    @Override
+    public byte[] serialize(String topic, String value) {
+      return value.getBytes();
+    }
+
+    @Override
+    public String deserialize(String topic, byte[] bytes) {
+      return new String(bytes);
+    }
+
+    @Override
+    public void close() {
+
+    }
+  }
+
+  public static class StringEncoderDecoder implements Encoder<String>, Decoder<String> {
+
+    public StringEncoderDecoder() {
+
+    }
+
+    public StringEncoderDecoder(VerifiableProperties props) {
+
+    }
+
+    @Override
+    public String fromBytes(byte[] bytes) {
+      return new String(bytes);
+    }
+
+    @Override
+    public byte[] toBytes(String value) {
+      return value.getBytes();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaDataIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaDataIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaDataIT.java
new file mode 100644
index 0000000..595a94b
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaDataIT.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+
+import kafka.api.OffsetRequest;
+import org.apache.crunch.Pair;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.crunch.kafka.ClusterTest.writeData;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNot.not;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public class KafkaDataIT {
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private String topic;
+  private Map<TopicPartition, Long> startOffsets;
+  private Map<TopicPartition, Long> stopOffsets;
+  private Map<TopicPartition, Pair<Long, Long>> offsets;
+  private Properties props;
+
+  @BeforeClass
+  public static void init() throws Exception {
+    ClusterTest.startTest();
+  }
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    ClusterTest.endTest();
+  }
+
+  @Before
+  public void setup() {
+    topic = testName.getMethodName();
+
+    props = ClusterTest.getConsumerProperties();
+
+    startOffsets = new HashMap<>();
+    stopOffsets = new HashMap<>();
+    offsets = new HashMap<>();
+    for (int i = 0; i < 4; i++) {
+      TopicPartition tp = new TopicPartition(topic, i);
+      startOffsets.put(tp, 0L);
+      stopOffsets.put(tp, 100L);
+
+      offsets.put(tp, Pair.of(0L, 100L));
+    }
+  }
+
+  @Test
+  public void getDataIterable() throws IOException {
+    int loops = 10;
+    int numPerLoop = 100;
+    int total = loops * numPerLoop;
+    List<String> keys = writeData(props, topic, "batch", loops, numPerLoop);
+
+    startOffsets = getStartOffsets(props, topic);
+    stopOffsets = getStopOffsets(props, topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
+    }
+
+    Iterable<Pair<String, String>> data = new KafkaData<String, String>(props, offsets).read(null);
+
+    int count = 0;
+    for (Pair<String, String> event : data) {
+      assertThat(keys, hasItem(event.first()));
+      assertTrue(keys.remove(event.first()));
+      count++;
+    }
+
+    assertThat(count, is(total));
+    assertThat(keys.size(), is(0));
+  }
+
+  private static Map<TopicPartition, Long> getStopOffsets(Properties props, String topic) {
+    return KafkaUtils.getBrokerOffsets(props, OffsetRequest.LatestTime(), topic);
+  }
+
+  private static Map<TopicPartition, Long> getStartOffsets(Properties props, String topic) {
+    return KafkaUtils.getBrokerOffsets(props, OffsetRequest.EarliestTime(), topic);
+  }
+}


[3/3] crunch git commit: CRUNCH-606: Handle setting version correctly and removed stray System.out in test.

Posted by mk...@apache.org.
CRUNCH-606: Handle setting version correctly and removed stray System.out in test.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/360d72a4
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/360d72a4
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/360d72a4

Branch: refs/heads/master
Commit: 360d72a4f887505e020fdb8f99c3ccb1800693f6
Parents: 321cfef
Author: Micah Whitacre <mk...@gmail.com>
Authored: Mon May 23 15:13:02 2016 -0500
Committer: Micah Whitacre <mk...@gmail.com>
Committed: Mon May 23 15:13:55 2016 -0500

----------------------------------------------------------------------
 crunch-kafka/pom.xml                                               | 2 +-
 .../test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java  | 1 -
 2 files changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/360d72a4/crunch-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-kafka/pom.xml b/crunch-kafka/pom.xml
index a96a9b0..7d6256b 100644
--- a/crunch-kafka/pom.xml
+++ b/crunch-kafka/pom.xml
@@ -22,7 +22,7 @@ under the License.
   <parent>
     <groupId>org.apache.crunch</groupId>
     <artifactId>crunch-parent</artifactId>
-    <version>0.14.0-SNAPSHOT</version>
+    <version>0.14.1-SNAPSHOT</version>
   </parent>
 
   <artifactId>crunch-kafka</artifactId>

http://git-wip-us.apache.org/repos/asf/crunch/blob/360d72a4/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
index ce97ec1..36f293f 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
@@ -177,7 +177,6 @@ public class KafkaRecordsIterableIT {
     Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
     for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
       offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
-      System.out.println(entry.getKey()+ "start:"+entry.getValue()+":end:"+stopOffsets.get(entry.getKey()));
     }
 
     Iterable<Pair<String, String>> data = new KafkaRecordsIterable<String, String>(consumer, offsets, new Properties());