You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/11 23:14:26 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1040] HighLevelConsumer re-design by removing references to …

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a328f9  [GOBBLIN-1040] HighLevelConsumer re-design by removing references to …
7a328f9 is described below

commit 7a328f9a232a60973d27c50859e6b84e63df90f7
Author: vbohra <vb...@linkedin.com>
AuthorDate: Wed Mar 11 16:14:19 2020 -0700

    [GOBBLIN-1040] HighLevelConsumer re-design by removing references to …
    
    Closes #2900 from vikrambohra/GOBBLIN-1040
---
 .../kafka/client/Kafka08ConsumerClient.java        |  11 +-
 .../gobblin/service/AvroJobSpecDeserializer.java   |   0
 .../gobblin/metrics/reporter/KafkaTestBase.java    |   1 +
 .../kafka/KafkaDeserializerExtractorTest.java      |   2 +-
 gobblin-modules/gobblin-kafka-09/build.gradle      |  18 +-
 .../kafka/client/Kafka09ConsumerClient.java        |  99 ++++++++-
 .../gobblin/service/AvroJobSpecDeserializer.java   |   0
 .../org/apache/gobblin/kafka/KafkaTestBase.java    |   4 +
 .../gobblin/runtime/HighLevelConsumerTest.java     | 176 +++++++++++++++
 .../gobblin/runtime}/KafkaAvroJobMonitorTest.java  |  18 +-
 .../runtime}/KafkaAvroJobStatusMonitorTest.java    |  66 ++++--
 .../gobblin/runtime/KafkaJobMonitorTest.java       | 127 +++++++++++
 .../runtime}/SLAEventKafkaJobMonitorTest.java      |  41 +++-
 .../service}/GobblinServiceManagerTest.java        |  38 +++-
 .../service/StreamingKafkaSpecExecutorTest.java    |  30 ++-
 .../kafka/client/BaseKafkaConsumerRecord.java      |  12 +
 ....java => GobblinConsumerRebalanceListener.java} |  42 ++--
 .../kafka/client/GobblinKafkaConsumerClient.java   |  49 +++-
 .../gobblin/kafka/client/KafkaConsumerRecord.java  |  19 ++
 .../kafka/writer/KafkaWriterConfigurationKeys.java |   4 +-
 gobblin-runtime/build.gradle                       |   1 -
 .../job_monitor/AvroJobSpecKafkaJobMonitor.java    |   2 +
 .../runtime/job_monitor/KafkaAvroJobMonitor.java   |   3 +-
 .../runtime/job_monitor/KafkaJobMonitor.java       |  12 +-
 .../gobblin/runtime/kafka/HighLevelConsumer.java   | 246 +++++++++++++++------
 .../runtime/job_monitor/KafkaJobMonitorTest.java   |  65 ------
 .../runtime/job_monitor/MockKafkaStream.java       | 106 ---------
 .../runtime/job_monitor/MockedKafkaJobMonitor.java |  28 +--
 .../runtime/kafka/HighLevelConsumerTest.java       |  82 -------
 .../runtime/kafka/MockedHighLevelConsumer.java     |  53 ++---
 gobblin-service/build.gradle                       |   4 +-
 .../service/monitoring/KafkaJobStatusMonitor.java  |  10 +-
 32 files changed, 895 insertions(+), 474 deletions(-)

diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/client/Kafka08ConsumerClient.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/client/Kafka08ConsumerClient.java
index 2ed5b70..8d9efbc 100644
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/client/Kafka08ConsumerClient.java
+++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/client/Kafka08ConsumerClient.java
@@ -247,6 +247,11 @@ public class Kafka08ConsumerClient extends AbstractBaseKafkaConsumerClient {
     }
   }
 
+  @Override
+  public Iterator<KafkaConsumerRecord> consume() {
+    throw new UnsupportedOperationException("consume() not supported by " + this.getClass().getSimpleName() + " Please use Kafka09ConsumerClient or above");
+  }
+
   private synchronized FetchResponse getFetchResponseForFetchRequest(FetchRequest fetchRequest, KafkaPartition partition) {
     SimpleConsumer consumer = getSimpleConsumer(partition.getLeader().getHostAndPort());
 
@@ -265,7 +270,7 @@ public class Kafka08ConsumerClient extends AbstractBaseKafkaConsumerClient {
           new Function<kafka.message.MessageAndOffset, KafkaConsumerRecord>() {
             @Override
             public KafkaConsumerRecord apply(kafka.message.MessageAndOffset input) {
-              return new Kafka08ConsumerRecord(input);
+              return new Kafka08ConsumerRecord(input, partition.getTopicName(), partition.getId());
             }
           });
     } catch (Exception e) {
@@ -345,8 +350,8 @@ public class Kafka08ConsumerClient extends AbstractBaseKafkaConsumerClient {
 
     private final MessageAndOffset messageAndOffset;
 
-    public Kafka08ConsumerRecord(MessageAndOffset messageAndOffset) {
-      super(messageAndOffset.offset(), messageAndOffset.message().size());
+    public Kafka08ConsumerRecord(MessageAndOffset messageAndOffset, String topic, int partition) {
+      super(messageAndOffset.offset(), messageAndOffset.message().size(), topic, partition);
       this.messageAndOffset = messageAndOffset;
     }
 
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java
similarity index 100%
copy from gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java
copy to gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java
diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaTestBase.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaTestBase.java
index 11aa603..65040d0 100644
--- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaTestBase.java
+++ b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaTestBase.java
@@ -66,6 +66,7 @@ public class KafkaTestBase implements Closeable {
     if (serverStarted && serverClosed) {
       throw new RuntimeException("Kafka test server has already been closed. Cannot generate Kafka server twice.");
     }
+
     if (!serverStarted) {
       serverStarted = true;
       zkConnect = TestZKUtils.zookeeperConnect();
diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractorTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractorTest.java
index 1a273ec..06b06e0 100644
--- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractorTest.java
+++ b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractorTest.java
@@ -297,7 +297,7 @@ public class KafkaDeserializerExtractorTest {
     Message mockMessage = mock(Message.class);
     when(mockMessage.payload()).thenReturn(payload);
     when(mockMessageAndOffset.message()).thenReturn(mockMessage);
-    return new Kafka08ConsumerRecord(mockMessageAndOffset);
+    return new Kafka08ConsumerRecord(mockMessageAndOffset, "test", 0);
   }
 
   @AllArgsConstructor
diff --git a/gobblin-modules/gobblin-kafka-09/build.gradle b/gobblin-modules/gobblin-kafka-09/build.gradle
index 5978d96..b33eb55 100644
--- a/gobblin-modules/gobblin-kafka-09/build.gradle
+++ b/gobblin-modules/gobblin-kafka-09/build.gradle
@@ -18,6 +18,7 @@
 apply plugin: 'java'
 
 dependencies {
+  compile project(":gobblin-runtime")
   compile project(":gobblin-modules:gobblin-kafka-common")
   compile project(":gobblin-core-base")
   compile project(":gobblin-utility")
@@ -49,7 +50,10 @@ dependencies {
   runtime externalDependency.confluentSchemaRegistryClient
   runtime externalDependency.protobuf
 
-  testCompile project(":gobblin-runtime")
+  testCompile project(":gobblin-service")
+  testCompile project(":gobblin-modules:gobblin-service-kafka")
+  testCompile project(path: ":gobblin-runtime", configuration: "tests")
+  testCompile project(path: ":gobblin-metastore", configuration: "testFixtures")
   testCompile project(":gobblin-test-utils")
   testCompile externalDependency.jsonAssert
   testCompile externalDependency.mockito
@@ -59,6 +63,8 @@ dependencies {
     exclude group: "com.sun.jdmk", module: "jmxtools"
     exclude group: "javax.jms", module: "jms"
   }
+  testCompile externalDependency.pegasus.data
+  testCompile externalDependency.pegasus.restliClient
 }
 
 configurations {
@@ -68,6 +74,16 @@ configurations {
   // HADOOP-5254 and MAPREDUCE-5664
   all*.exclude group: 'xml-apis'
   all*.exclude group: 'xerces'
+  tests
+}
+
+task testJar(type: Jar, dependsOn: testClasses) {
+  baseName = "test-${project.archivesBaseName}"
+  from sourceSets.test.output
+}
+
+artifacts {
+  tests testJar
 }
 
 test {
diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
index 97581c6..931cf3d 100644
--- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
+++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
@@ -17,6 +17,7 @@
 package org.apache.gobblin.kafka.client;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -24,11 +25,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -39,6 +46,7 @@ import com.codahale.metrics.Metric;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterators;
@@ -48,6 +56,7 @@ import com.typesafe.config.ConfigFactory;
 
 import javax.annotation.Nonnull;
 import lombok.EqualsAndHashCode;
+import lombok.Getter;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 
@@ -76,7 +85,7 @@ public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient
   private static final String KAFKA_09_CLIENT_GROUP_ID = "group.id";
 
   private static final String KAFKA_09_DEFAULT_ENABLE_AUTO_COMMIT = Boolean.toString(false);
-  private static final String KAFKA_09_DEFAULT_KEY_DESERIALIZER =
+  public static final String KAFKA_09_DEFAULT_KEY_DESERIALIZER =
       "org.apache.kafka.common.serialization.StringDeserializer";
   private static final String KAFKA_09_DEFAULT_GROUP_ID = "kafka09";
 
@@ -160,12 +169,53 @@ public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient
 
     this.consumer.assign(Lists.newArrayList(new TopicPartition(partition.getTopicName(), partition.getId())));
     this.consumer.seek(new TopicPartition(partition.getTopicName(), partition.getId()), nextOffset);
-    ConsumerRecords<K, V> consumerRecords = consumer.poll(super.fetchTimeoutMillis);
-    return Iterators.transform(consumerRecords.iterator(), new Function<ConsumerRecord<K, V>, KafkaConsumerRecord>() {
+    return consume();
+  }
+
+  @Override
+  public Iterator<KafkaConsumerRecord> consume() {
+    try {
+      ConsumerRecords<K, V> consumerRecords = consumer.poll(super.fetchTimeoutMillis);
+
+      return Iterators.transform(consumerRecords.iterator(), input -> {
+        try {
+          return new Kafka09ConsumerRecord(input);
+        } catch (Throwable t) {
+          throw Throwables.propagate(t);
+        }
+      });
+    } catch (Exception e) {
+      log.error("Exception on polling records", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Subscribe to a kafka topic
+   * TODO Add multi topic support
+   * @param topic
+   */
+  @Override
+  public void subscribe(String topic) {
+    this.consumer.subscribe(Lists.newArrayList(topic), new NoOpConsumerRebalanceListener());
+  }
 
+  /**
+   * Subscribe to a kafka topic with a {#GobblinConsumerRebalanceListener}
+   * TODO Add multi topic support
+   * @param topic
+   */
+  @Override
+  public void subscribe(String topic, GobblinConsumerRebalanceListener listener) {
+    this.consumer.subscribe(Lists.newArrayList(topic), new ConsumerRebalanceListener() {
       @Override
-      public KafkaConsumerRecord apply(ConsumerRecord<K, V> input) {
-        return new Kafka09ConsumerRecord<>(input);
+      public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+        listener.onPartitionsRevoked(partitions.stream().map(a -> new KafkaPartition.Builder().withTopicName(a.topic()).withId(a.partition()).build()).collect(Collectors.toList()));
+      }
+
+      @Override
+      public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+        listener.onPartitionsAssigned(partitions.stream().map(a -> new KafkaPartition.Builder().withTopicName(a.topic()).withId(a.partition()).build()).collect(Collectors.toList()));
       }
     });
   }
@@ -181,6 +231,43 @@ public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient
   }
 
   /**
+   * Commit offsets to Kafka asynchronously
+   */
+  @Override
+  public void commitOffsetsAsync(Map<KafkaPartition, Long> partitionOffsets) {
+    Map<TopicPartition, OffsetAndMetadata> offsets = partitionOffsets.entrySet().stream().collect(Collectors.toMap(e -> new TopicPartition(e.getKey().getTopicName(),e.getKey().getId()), e -> new OffsetAndMetadata(e.getValue())));
+    consumer.commitAsync(offsets, new OffsetCommitCallback() {
+      @Override
+      public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
+        if(exception != null) {
+          log.error("Exception while committing offsets " + offsets, exception);
+          return;
+        }
+      }
+    });
+  }
+
+  /**
+   * Commit offsets to Kafka synchronously
+   */
+  @Override
+  public void commitOffsetsSync(Map<KafkaPartition, Long> partitionOffsets) {
+    Map<TopicPartition, OffsetAndMetadata> offsets = partitionOffsets.entrySet().stream().collect(Collectors.toMap(e -> new TopicPartition(e.getKey().getTopicName(),e.getKey().getId()), e -> new OffsetAndMetadata(e.getValue())));
+    consumer.commitSync(offsets);
+  }
+
+  /**
+   * returns the last committed offset for a KafkaPartition
+   * @param partition
+   * @return last committed offset or -1 for invalid KafkaPartition
+   */
+  @Override
+  public long committed(KafkaPartition partition) {
+    OffsetAndMetadata offsetAndMetadata =  consumer.committed(new TopicPartition(partition.getTopicName(), partition.getId()));
+    return offsetAndMetadata != null ? offsetAndMetadata.offset() : -1l;
+  }
+
+  /**
    * Convert a {@link KafkaMetric} instance to a {@link Metric}.
    * @param kafkaMetric
    * @return
@@ -240,7 +327,7 @@ public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient
     public Kafka09ConsumerRecord(ConsumerRecord<K, V> consumerRecord) {
       // Kafka 09 consumerRecords do not provide value size.
       // Only 08 and 10 versions provide them.
-      super(consumerRecord.offset(), BaseKafkaConsumerRecord.VALUE_SIZE_UNAVAILABLE);
+      super(consumerRecord.offset(), BaseKafkaConsumerRecord.VALUE_SIZE_UNAVAILABLE, consumerRecord.topic(), consumerRecord.partition());
       this.consumerRecord = consumerRecord;
     }
 
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java
similarity index 100%
rename from gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java
rename to gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaTestBase.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaTestBase.java
index f99680c..2eb3fa7 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaTestBase.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaTestBase.java
@@ -265,5 +265,9 @@ public class KafkaTestBase implements Closeable {
   public int getKafkaServerPort() {
     return _kafkaServerSuite.getKafkaServerPort();
   }
+
+  public String getZkConnectString() {
+    return this._kafkaServerSuite.getZkConnectString();
+  }
 }
 
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
new file mode 100644
index 0000000..75c667b
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import java.io.File;
+import java.util.List;
+import java.util.Properties;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import com.google.api.client.util.Lists;
+import com.google.common.base.Optional;
+import com.google.common.io.Closer;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.KafkaTestBase;
+import org.apache.gobblin.kafka.client.AbstractBaseKafkaConsumerClient;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
+import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
+import org.apache.gobblin.kafka.writer.Kafka09DataWriter;
+import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys;
+import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
+import org.apache.gobblin.runtime.kafka.MockedHighLevelConsumer;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.AsyncDataWriter;
+import org.apache.gobblin.writer.WriteCallback;
+
+
+@Test
+public class HighLevelConsumerTest extends KafkaTestBase {
+
+  private static final String BOOTSTRAP_SERVERS_KEY = "bootstrap.servers";
+  private static final String KAFKA_AUTO_OFFSET_RESET_KEY = "auto.offset.reset";
+  private static final String SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT = AbstractBaseKafkaConsumerClient.CONFIG_NAMESPACE + "." + AbstractBaseKafkaConsumerClient.CONSUMER_CONFIG + ".";
+  private static final String TOPIC = HighLevelConsumerTest.class.getSimpleName();
+  private static final int NUM_PARTITIONS = 2;
+  private static final int NUM_MSGS = 10;
+
+  private Closer _closer;
+  private String _kafkaBrokers;
+  private AsyncDataWriter dataWriter;
+
+  public HighLevelConsumerTest()
+      throws InterruptedException, RuntimeException {
+    super();
+    _kafkaBrokers = "localhost:" + this.getKafkaServerPort();
+  }
+
+  @BeforeSuite
+  public void beforeSuite() throws Exception {
+    startServers();
+    _closer = Closer.create();
+    Properties producerProps = new Properties();
+    producerProps.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, TOPIC);
+    producerProps.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + BOOTSTRAP_SERVERS_KEY, _kafkaBrokers);
+    producerProps.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + KafkaWriterConfigurationKeys.VALUE_SERIALIZER_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+    producerProps.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER, this.getZkConnectString());
+    producerProps.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT, String.valueOf(NUM_PARTITIONS));
+    dataWriter = _closer.register(new Kafka09DataWriter(producerProps));
+
+    List<byte[]> records = createByteArrayMessages(NUM_MSGS);
+    WriteCallback mock = Mockito.mock(WriteCallback.class);
+    for(byte[] record : records) {
+      dataWriter.write(record, mock);
+    }
+    dataWriter.flush();
+  }
+
+  public static Config getSimpleConfig(Optional<String> prefix) {
+    Properties properties = new Properties();
+    properties.put(getConfigKey(prefix, ConfigurationKeys.KAFKA_BROKERS), "localhost:1234");
+    properties.put(getConfigKey(prefix, Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY), Kafka09ConsumerClient.KAFKA_09_DEFAULT_KEY_DESERIALIZER);
+    properties.put(getConfigKey(prefix, "zookeeper.connect"), "zookeeper");
+    properties.put(ConfigurationKeys.STATE_STORE_ENABLED, "true");
+    File tmpDir = Files.createTempDir();
+    tmpDir.deleteOnExit();
+    properties.put(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, tmpDir.toString());
+
+    return ConfigFactory.parseProperties(properties);
+  }
+
+  private static String getConfigKey(Optional<String> prefix, String key) {
+    return prefix.isPresent() ? prefix.get() + "." + key : key;
+  }
+
+  @Test
+  public void testConsumerAutoOffsetCommit() throws Exception {
+    Properties consumerProps = new Properties();
+    consumerProps.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers);
+    consumerProps.setProperty(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+    consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + KAFKA_AUTO_OFFSET_RESET_KEY, "earliest");
+    consumerProps.setProperty(HighLevelConsumer.ENABLE_AUTO_COMMIT_KEY, "true");
+
+    MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(TOPIC, ConfigUtils.propertiesToConfig(consumerProps), NUM_PARTITIONS);
+    consumer.startAsync().awaitRunning();
+
+    consumer.awaitExactlyNMessages(NUM_MSGS, 5000);
+    consumer.shutDown();
+  }
+
+  @Test
+  public void testConsumerManualOffsetCommit() throws Exception {
+    Properties consumerProps = new Properties();
+    consumerProps.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers);
+    consumerProps.setProperty(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+    consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + KAFKA_AUTO_OFFSET_RESET_KEY, "earliest");
+
+    // Setting this to a second to make sure we are committing offsets frequently
+    consumerProps.put(HighLevelConsumer.OFFSET_COMMIT_TIME_THRESHOLD_SECS_KEY, 1);
+
+    MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(TOPIC, ConfigUtils.propertiesToConfig(consumerProps),
+        NUM_PARTITIONS);
+    consumer.startAsync().awaitRunning();
+
+    consumer.awaitExactlyNMessages(NUM_MSGS, 5000);
+    try {
+      Thread.sleep(2000);
+    } catch (InterruptedException e) {
+    }
+
+    GobblinKafkaConsumerClient client  = consumer.getGobblinKafkaConsumerClient();
+    for(int i=0; i< NUM_PARTITIONS; i++) {
+      KafkaPartition partition = new KafkaPartition.Builder().withTopicName(TOPIC).withId(i).build();
+      Assert.assertTrue(consumer.getCommittedOffsets().containsKey(partition));
+    }
+    consumer.shutDown();
+  }
+
+  private List<byte[]> createByteArrayMessages(int numMsgs) {
+    List<byte[]> records = Lists.newArrayList();
+
+    for(int i=0; i<numMsgs; i++) {
+      byte[] msg = ("msg_" + i).getBytes();
+      records.add(msg);
+    }
+    return records;
+  }
+
+  @AfterSuite
+  public void afterSuite() {
+    try {
+      _closer.close();
+    } catch (Exception e) {
+      System.out.println("Failed to close data writer." +  e);
+    } finally {
+      try {
+        close();
+      } catch (Exception e) {
+        System.out.println("Failed to close Kafka server."+ e);
+      }
+    }
+  }
+}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaAvroJobMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobMonitorTest.java
similarity index 92%
rename from gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaAvroJobMonitorTest.java
rename to gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobMonitorTest.java
index 416e237..2e63cc2 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaAvroJobMonitorTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobMonitorTest.java
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.runtime.job_monitor;
+package org.apache.gobblin.runtime;
 
+import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.List;
@@ -38,7 +39,8 @@ import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
 import org.apache.gobblin.metrics.reporter.util.NoopSchemaVersionWriter;
 import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
 import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.kafka.HighLevelConsumerTest;
+import org.apache.gobblin.runtime.job_monitor.KafkaAvroJobMonitor;
+import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor;
 import org.apache.gobblin.util.Either;
 
 
@@ -134,6 +136,18 @@ public class KafkaAvroJobMonitorTest {
       this.events.add(message);
       return Lists.newArrayList(Either.<JobSpec, URI>left(JobSpec.builder(message.getName()).build()));
     }
+
+    @Override
+    protected void buildMetricsContextAndMetrics() {
+      super.buildMetricsContextAndMetrics();
+    }
+
+    @Override
+    protected void shutdownMetrics()
+        throws IOException {
+      super.shutdownMetrics();
+    }
+
   }
 
 
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
similarity index 84%
rename from gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
rename to gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index eb68cf4..05dc4c3 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gobblin.service.monitoring;
+package org.apache.gobblin.runtime;
 
 import java.io.File;
 import java.io.IOException;
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -43,6 +44,8 @@ import kafka.message.MessageAndMetadata;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.kafka.KafkaTestBase;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
 import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.MetricContext;
@@ -52,6 +55,9 @@ import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
 import org.apache.gobblin.metrics.kafka.KafkaKeyValueProducerPusher;
 import org.apache.gobblin.metrics.kafka.Pusher;
 import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor;
+import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
 
 
 public class KafkaAvroJobStatusMonitorTest {
@@ -123,14 +129,16 @@ public class KafkaAvroJobStatusMonitorTest {
       Thread.currentThread().interrupt();
     }
 
-    Config config = ConfigFactory.empty().withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
+    Config config = ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, ConfigValueFactory.fromAnyRef("localhost:0000"))
+        .withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+        .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
         .withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"));
-    KafkaJobStatusMonitor jobStatusMonitor =  new KafkaAvroJobStatusMonitor("test",config, 1);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor =  new MockKafkaAvroJobStatusMonitor("test",config, 1);
 
     ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
 
     MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
-    jobStatusMonitor.processMessage(messageAndMetadata);
+    jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
 
     StateStore stateStore = jobStatusMonitor.getStateStore();
     String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
@@ -141,7 +149,7 @@ public class KafkaAvroJobStatusMonitorTest {
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPILED.name());
 
     messageAndMetadata = iterator.next();
-    jobStatusMonitor.processMessage(messageAndMetadata);
+    jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
 
     tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, this.jobGroup, this.jobName);
     stateList  = stateStore.getAll(storeName, tableName);
@@ -150,7 +158,7 @@ public class KafkaAvroJobStatusMonitorTest {
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
 
     messageAndMetadata = iterator.next();
-    jobStatusMonitor.processMessage(messageAndMetadata);
+    jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
 
     stateList  = stateStore.getAll(storeName, tableName);
     Assert.assertEquals(stateList.size(), 1);
@@ -158,7 +166,7 @@ public class KafkaAvroJobStatusMonitorTest {
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.RUNNING.name());
 
     messageAndMetadata = iterator.next();
-    jobStatusMonitor.processMessage(messageAndMetadata);
+    jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
 
     stateList  = stateStore.getAll(storeName, tableName);
     Assert.assertEquals(stateList.size(), 1);
@@ -170,12 +178,14 @@ public class KafkaAvroJobStatusMonitorTest {
 
     // Check that state didn't get set to running since it was already complete
     messageAndMetadata = iterator.next();
-    jobStatusMonitor.processMessage(messageAndMetadata);
+    jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
 
     stateList  = stateStore.getAll(storeName, tableName);
     Assert.assertEquals(stateList.size(), 1);
     state = stateList.get(0);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPLETE.name());
+
+    jobStatusMonitor.shutDown();
   }
 
   @Test
@@ -222,14 +232,16 @@ public class KafkaAvroJobStatusMonitorTest {
       Thread.currentThread().interrupt();
     }
 
-    Config config = ConfigFactory.empty().withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
+    Config config = ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, ConfigValueFactory.fromAnyRef("localhost:0000"))
+        .withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+        .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
         .withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"));
-    KafkaJobStatusMonitor jobStatusMonitor =  new KafkaAvroJobStatusMonitor("test",config, 1);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = new MockKafkaAvroJobStatusMonitor("test",config, 1);
 
     ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
 
     MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
-    jobStatusMonitor.processMessage(messageAndMetadata);
+    jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
 
     StateStore stateStore = jobStatusMonitor.getStateStore();
     String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
@@ -240,7 +252,7 @@ public class KafkaAvroJobStatusMonitorTest {
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPILED.name());
 
     messageAndMetadata = iterator.next();
-    jobStatusMonitor.processMessage(messageAndMetadata);
+    jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
 
     tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, this.jobGroup, this.jobName);
     stateList  = stateStore.getAll(storeName, tableName);
@@ -249,7 +261,7 @@ public class KafkaAvroJobStatusMonitorTest {
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
 
     messageAndMetadata = iterator.next();
-    jobStatusMonitor.processMessage(messageAndMetadata);
+    jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
 
     stateList  = stateStore.getAll(storeName, tableName);
     Assert.assertEquals(stateList.size(), 1);
@@ -257,7 +269,7 @@ public class KafkaAvroJobStatusMonitorTest {
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.RUNNING.name());
 
     messageAndMetadata = iterator.next();
-    jobStatusMonitor.processMessage(messageAndMetadata);
+    jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
 
     stateList  = stateStore.getAll(storeName, tableName);
     Assert.assertEquals(stateList.size(), 1);
@@ -267,7 +279,7 @@ public class KafkaAvroJobStatusMonitorTest {
     Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD), Boolean.toString(true));
 
     messageAndMetadata = iterator.next();
-    jobStatusMonitor.processMessage(messageAndMetadata);
+    jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
 
     stateList  = stateStore.getAll(storeName, tableName);
     Assert.assertEquals(stateList.size(), 1);
@@ -276,7 +288,7 @@ public class KafkaAvroJobStatusMonitorTest {
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
 
     messageAndMetadata = iterator.next();
-    jobStatusMonitor.processMessage(messageAndMetadata);
+    jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
 
     stateList  = stateStore.getAll(storeName, tableName);
     Assert.assertEquals(stateList.size(), 1);
@@ -285,7 +297,7 @@ public class KafkaAvroJobStatusMonitorTest {
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.RUNNING.name());
 
     messageAndMetadata = iterator.next();
-    jobStatusMonitor.processMessage(messageAndMetadata);
+    jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
 
     stateList  = stateStore.getAll(storeName, tableName);
     Assert.assertEquals(stateList.size(), 1);
@@ -293,6 +305,8 @@ public class KafkaAvroJobStatusMonitorTest {
     //Because the maximum attempt is set to 2, so the state is set to Failed after trying twice
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.FAILED.name());
     Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD), Boolean.toString(false));
+
+    jobStatusMonitor.shutDown();
   }
 
   private GobblinTrackingEvent createFlowCompiledEvent() {
@@ -405,6 +419,11 @@ public class KafkaAvroJobStatusMonitorTest {
     return event;
   }
 
+  private DecodeableKafkaRecord convertMessageAndMetadataToDecodableKafkaRecord(MessageAndMetadata messageAndMetadata) {
+    ConsumerRecord consumerRecord = new ConsumerRecord<>(TOPIC, messageAndMetadata.partition(), messageAndMetadata.offset(), messageAndMetadata.key(), messageAndMetadata.message());
+    return new Kafka09ConsumerClient.Kafka09ConsumerRecord(consumerRecord);
+  }
+
   private void cleanUpDir(String dir) throws Exception {
     File specStoreDir = new File(dir);
     if (specStoreDir.exists()) {
@@ -429,4 +448,17 @@ public class KafkaAvroJobStatusMonitorTest {
       System.err.println("Failed to close Kafka server.");
     }
   }
+
+  class MockKafkaAvroJobStatusMonitor extends KafkaAvroJobStatusMonitor {
+
+    public MockKafkaAvroJobStatusMonitor(String topic, Config config, int numThreads)
+        throws IOException, ReflectiveOperationException {
+      super(topic, config, numThreads);
+    }
+
+    @Override
+    protected void processMessage(DecodeableKafkaRecord record) {
+      super.processMessage(record);
+    }
+  }
 }
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaJobMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaJobMonitorTest.java
new file mode 100644
index 0000000..f47bae4
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaJobMonitorTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import java.net.URI;
+import java.util.Properties;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import com.google.common.io.Closer;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.KafkaTestBase;
+import org.apache.gobblin.kafka.client.AbstractBaseKafkaConsumerClient;
+import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
+import org.apache.gobblin.kafka.writer.Kafka09DataWriter;
+import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys;
+import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor;
+import org.apache.gobblin.runtime.job_monitor.MockedKafkaJobMonitor;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.AsyncDataWriter;
+import org.apache.gobblin.writer.WriteCallback;
+
+
+public class KafkaJobMonitorTest extends KafkaTestBase {
+
+  private static final String BOOTSTRAP_SERVERS_KEY = "bootstrap.servers";
+  private static final String KAFKA_AUTO_OFFSET_RESET_KEY = "auto.offset.reset";
+  private static final String SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT = AbstractBaseKafkaConsumerClient.CONFIG_NAMESPACE + "." + AbstractBaseKafkaConsumerClient.CONSUMER_CONFIG + ".";
+  private static final String TOPIC = KafkaJobMonitorTest.class.getSimpleName();
+  private static final int NUM_PARTITIONS = 2;
+
+  private Closer _closer;
+  private String _kafkaBrokers;
+  private AsyncDataWriter dataWriter;
+
+  public KafkaJobMonitorTest()
+      throws InterruptedException, RuntimeException {
+    super();
+    _kafkaBrokers = "localhost:" + this.getKafkaServerPort();
+  }
+
+  @BeforeSuite
+  public void beforeSuite() throws Exception {
+    startServers();
+    _closer = Closer.create();
+    Properties producerProps = new Properties();
+    producerProps.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, TOPIC);
+    producerProps.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + BOOTSTRAP_SERVERS_KEY, _kafkaBrokers);
+    producerProps.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + KafkaWriterConfigurationKeys.VALUE_SERIALIZER_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+    producerProps.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER, this.getZkConnectString());
+    producerProps.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT, String.valueOf(NUM_PARTITIONS));
+    dataWriter = _closer.register(new Kafka09DataWriter(producerProps));
+  }
+
+  @Test
+  public void test() throws Exception {
+
+    Properties consumerProps = new Properties();
+    consumerProps.put(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX + "." + ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers);
+    consumerProps.put(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX + "." + Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+    consumerProps.setProperty(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX + "." + SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + KAFKA_AUTO_OFFSET_RESET_KEY, "earliest");
+
+    MockedKafkaJobMonitor monitor = MockedKafkaJobMonitor.create(TOPIC, ConfigUtils.propertiesToConfig(consumerProps));
+    monitor.startAsync().awaitRunning();
+
+    WriteCallback mockCallback = Mockito.mock(WriteCallback.class);
+    dataWriter.write("job1:1".getBytes(), mockCallback);
+    monitor.awaitExactlyNSpecs(1);
+    Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job1")));
+    Assert.assertEquals(monitor.getJobSpecs().get(new URI("job1")).getVersion(), "1");
+
+    dataWriter.write("job2:1".getBytes(), mockCallback);
+    monitor.awaitExactlyNSpecs(2);
+    Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job2")));
+    Assert.assertEquals(monitor.getJobSpecs().get(new URI("job2")).getVersion(), "1");
+
+    dataWriter.write((MockedKafkaJobMonitor.REMOVE + ":job1").getBytes(), mockCallback);
+    monitor.awaitExactlyNSpecs(1);
+    Assert.assertFalse(monitor.getJobSpecs().containsKey(new URI("job1")));
+    Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job2")));
+
+    dataWriter.write(("job2:2,job1:2").getBytes(), mockCallback);
+    monitor.awaitExactlyNSpecs(2);
+    Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job1")));
+    Assert.assertEquals(monitor.getJobSpecs().get(new URI("job1")).getVersion(), "2");
+    Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job2")));
+    Assert.assertEquals(monitor.getJobSpecs().get(new URI("job2")).getVersion(), "2");
+
+    monitor.shutDown();
+  }
+
+  @AfterSuite
+  public void afterSuite() {
+    try {
+      _closer.close();
+    } catch (Exception e) {
+      System.out.println("Failed to close data writer." +  e);
+    } finally {
+      try {
+        close();
+      } catch (Exception e) {
+        System.out.println("Failed to close Kafka server."+ e);
+      }
+    }
+  }
+
+}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/SLAEventKafkaJobMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/SLAEventKafkaJobMonitorTest.java
similarity index 83%
rename from gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/SLAEventKafkaJobMonitorTest.java
rename to gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/SLAEventKafkaJobMonitorTest.java
index 5c278c9..74564b7 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/SLAEventKafkaJobMonitorTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/SLAEventKafkaJobMonitorTest.java
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.runtime.job_monitor;
+package org.apache.gobblin.runtime;
 
+import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.Map;
@@ -36,8 +37,11 @@ import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.event.sla.SlaEventKeys;
 import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
 import org.apache.gobblin.metrics.reporter.util.NoopSchemaVersionWriter;
+import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
 import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.kafka.HighLevelConsumerTest;
+import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor;
+import org.apache.gobblin.runtime.job_monitor.SLAEventKafkaJobMonitor;
 import org.apache.gobblin.util.Either;
 
 
@@ -54,8 +58,8 @@ public class SLAEventKafkaJobMonitorTest {
   @Test
   public void testParseJobSpec() throws Exception {
 
-    SLAEventKafkaJobMonitor monitor =
-        new SLAEventKafkaJobMonitor("topic", null, new URI("/base/URI"),
+    MockSLAEventKafkaJobMonitor monitor =
+        new MockSLAEventKafkaJobMonitor("topic", null, new URI("/base/URI"),
             HighLevelConsumerTest.getSimpleConfig(Optional.of(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX)),
             new NoopSchemaVersionWriter(), Optional.<Pattern>absent(), Optional.<Pattern>absent(), this.templateURI,
             ImmutableMap.of("metadataKey1", "key1"));
@@ -79,8 +83,8 @@ public class SLAEventKafkaJobMonitorTest {
   @Test
   public void testFilterByName() throws Exception {
 
-    SLAEventKafkaJobMonitor monitor =
-        new SLAEventKafkaJobMonitor("topic", null, new URI("/base/URI"),
+    MockSLAEventKafkaJobMonitor monitor =
+        new MockSLAEventKafkaJobMonitor("topic", null, new URI("/base/URI"),
             HighLevelConsumerTest.getSimpleConfig(Optional.of(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX)),
             new NoopSchemaVersionWriter(), Optional.<Pattern>absent(), Optional.of(Pattern.compile("^accept.*")),
             this.templateURI, ImmutableMap.<String, String>of());
@@ -110,8 +114,8 @@ public class SLAEventKafkaJobMonitorTest {
     props.put(SLAEventKafkaJobMonitor.DATASET_URN_FILTER_KEY, "^/accept.*");
     Config config = ConfigFactory.parseProperties(props).withFallback(superConfig);
 
-    SLAEventKafkaJobMonitor monitor =
-        new SLAEventKafkaJobMonitor("topic", null, new URI("/base/URI"),
+    MockSLAEventKafkaJobMonitor monitor =
+        new MockSLAEventKafkaJobMonitor("topic", null, new URI("/base/URI"),
             HighLevelConsumerTest.getSimpleConfig(Optional.of(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX)),
             new NoopSchemaVersionWriter(), Optional.of(Pattern.compile("^/accept.*")), Optional.<Pattern>absent(),
             this.templateURI, ImmutableMap.<String, String>of());
@@ -171,4 +175,25 @@ public class SLAEventKafkaJobMonitorTest {
     return new GobblinTrackingEvent(0L, "namespace", name, metadata);
   }
 
+  class MockSLAEventKafkaJobMonitor extends SLAEventKafkaJobMonitor {
+
+    protected MockSLAEventKafkaJobMonitor(String topic, MutableJobCatalog catalog, URI baseURI,
+        Config limitedScopeConfig, SchemaVersionWriter<?> versionWriter, Optional<Pattern> urnFilter,
+        Optional<Pattern> nameFilter, URI template, Map<String, String> extractKeys)
+        throws IOException {
+      super(topic, catalog, baseURI, limitedScopeConfig, versionWriter, urnFilter, nameFilter, template, extractKeys);
+    }
+
+    @Override
+    protected void buildMetricsContextAndMetrics() {
+      super.buildMetricsContextAndMetrics();
+    }
+
+    @Override
+    protected void shutdownMetrics()
+        throws IOException {
+      super.shutdownMetrics();
+    }
+  }
+
 }
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
similarity index 93%
rename from gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
rename to gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 7aecfb1..125c2bf 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.service.modules.core;
+package org.apache.gobblin.service;
 
 import java.io.File;
 import java.net.URI;
@@ -25,6 +25,8 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.FileUtils;
+
+import org.apache.gobblin.restli.EmbeddedRestliServer;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.hadoop.fs.Path;
 import org.eclipse.jetty.http.HttpStatus;
@@ -47,6 +49,7 @@ import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.linkedin.data.template.StringMap;
 import com.linkedin.restli.client.RestLiResponseException;
+import com.typesafe.config.Config;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.kafka.KafkaTestBase;
@@ -55,11 +58,8 @@ import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.service.FlowConfig;
-import org.apache.gobblin.service.FlowConfigClient;
-import org.apache.gobblin.service.FlowId;
-import org.apache.gobblin.service.Schedule;
-import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.core.GitConfigMonitor;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
 import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
 import org.apache.gobblin.util.ConfigUtils;
 
@@ -89,7 +89,7 @@ public class GobblinServiceManagerTest {
   private static final String TEST_SOURCE_NAME = "testSource";
   private static final String TEST_SINK_NAME = "testSink";
 
-  private GobblinServiceManager gobblinServiceManager;
+  private MockGobblinServiceManager gobblinServiceManager;
   private FlowConfigClient flowConfigClient;
 
   private Git gitForPush;
@@ -143,12 +143,12 @@ public class GobblinServiceManagerTest {
     this.gitForPush.commit().setMessage("First commit").call();
     this.gitForPush.push().setRemote("origin").setRefSpecs(new RefSpec("master")).call();
 
-    this.gobblinServiceManager = new GobblinServiceManager("CoreService", "1",
+    this.gobblinServiceManager = new MockGobblinServiceManager("CoreService", "1",
         ConfigUtils.propertiesToConfig(serviceCoreProperties), Optional.of(new Path(SERVICE_WORK_DIR)));
     this.gobblinServiceManager.start();
 
     this.flowConfigClient = new FlowConfigClient(String.format("http://localhost:%s/",
-        this.gobblinServiceManager.restliServer.getPort()));
+        this.gobblinServiceManager.getRestLiServer().getPort()));
   }
 
   private void cleanUpDir(String dir) throws Exception {
@@ -192,7 +192,7 @@ public class GobblinServiceManagerTest {
         .setProperties(new StringMap(flowProperties));
 
     this.flowConfigClient.createFlowConfig(flowConfig);
-    Assert.assertTrue(this.gobblinServiceManager.flowCatalog.getSpecs().size() == 1, "Flow that was created is not "
+    Assert.assertTrue(this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 1, "Flow that was created is not "
         + "reflecting in FlowCatalog");
   }
 
@@ -289,7 +289,7 @@ public class GobblinServiceManagerTest {
 
     Files.write("flow.name=testFlow\nflow.group=testGroup\nparam1=value20\n", testFlowFile, Charsets.UTF_8);
 
-    Collection<Spec> specs = this.gobblinServiceManager.flowCatalog.getSpecs();
+    Collection<Spec> specs = this.gobblinServiceManager.getFlowCatalog().getSpecs();
     Assert.assertTrue(specs.size() == 0);
 
     // add, commit, push
@@ -300,7 +300,7 @@ public class GobblinServiceManagerTest {
     // polling is every 5 seconds, so wait twice as long and check
     TimeUnit.SECONDS.sleep(10);
 
-    specs = this.gobblinServiceManager.flowCatalog.getSpecs();
+    specs = this.gobblinServiceManager.getFlowCatalog().getSpecs();
     Assert.assertTrue(specs.size() == 1);
 
     FlowSpec spec = (FlowSpec) (specs.iterator().next());
@@ -356,4 +356,18 @@ public class GobblinServiceManagerTest {
     }
     cleanUpDir(FLOW_SPEC_STORE_DIR);
   }
+
+  class MockGobblinServiceManager extends GobblinServiceManager {
+
+    public MockGobblinServiceManager(String serviceName, String serviceId, Config config,
+        Optional<Path> serviceWorkDirOptional)
+        throws Exception {
+      super(serviceName, serviceId, config, serviceWorkDirOptional);
+    }
+
+    protected EmbeddedRestliServer getRestLiServer() {
+      return this.restliServer;
+    }
+
+  }
 }
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
similarity index 86%
rename from gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
rename to gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
index e9c7ee6..26d3e3b 100644
--- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
@@ -27,18 +28,21 @@ import java.util.Properties;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.Test;
 
 import com.google.common.io.Closer;
 import com.typesafe.config.Config;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.KafkaTestBase;
+import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
 import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys;
-import org.apache.gobblin.metrics.reporter.KafkaTestBase;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
+import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.writer.WriteResponse;
 import org.apache.gobblin.runtime.api.SpecExecutor;
@@ -60,10 +64,16 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
   private static final String _TEST_DIR_PATH = "/tmp/StreamingKafkaSpecExecutorTest";
   private static final String _JOBS_DIR_PATH = _TEST_DIR_PATH + "/jobs";
 
+  @BeforeSuite
+  public void beforeSuite() {
+    log.info("Process id = " + ManagementFactory.getRuntimeMXBean().getName());
+    startServers();
+  }
+
   public StreamingKafkaSpecExecutorTest()
       throws InterruptedException, RuntimeException {
-    super(TOPIC);
-    _kafkaBrokers = "localhost:" + kafkaPort;
+    super();
+    _kafkaBrokers = "localhost:" + this.getKafkaServerPort();
     log.info("Going to use Kakfa broker: " + _kafkaBrokers);
 
     cleanupTestDir();
@@ -88,13 +98,16 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
 
     // Properties for Producer
     _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, TOPIC);
+    _properties.setProperty("spec.kafka.dataWriterClass", "org.apache.gobblin.kafka.writer.Kafka09DataWriter");
     _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers", _kafkaBrokers);
     _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
 
     // Properties for Consumer
-    _properties.setProperty("jobSpecMonitor.kafka.zookeeper.connect", zkConnect);
+    _properties.setProperty(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX + "." + ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers);
+    _properties.setProperty(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX + "." + Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
     _properties.setProperty(SimpleKafkaSpecExecutor.SPEC_KAFKA_TOPICS_KEY, TOPIC);
     _properties.setProperty("gobblin.cluster.jobconf.fullyQualifiedPath", _JOBS_DIR_PATH);
+    _properties.setProperty(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX + "." + Kafka09ConsumerClient.CONFIG_PREFIX + Kafka09ConsumerClient.CONSUMER_CONFIG + ".auto.offset.reset", "earliest");
 
     Config config = ConfigUtils.propertiesToConfig(_properties);
 
@@ -164,7 +177,7 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
         .build();
   }
 
-  @AfterClass
+  @AfterSuite
   public void after() {
     try {
       _closer.close();
@@ -183,9 +196,4 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
 
     cleanupTestDir();
   }
-
-  @AfterSuite
-  public void afterSuite() {
-    closeServer();
-  }
 }
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/BaseKafkaConsumerRecord.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/BaseKafkaConsumerRecord.java
index 017a166..65f84be 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/BaseKafkaConsumerRecord.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/BaseKafkaConsumerRecord.java
@@ -30,6 +30,8 @@ public abstract class BaseKafkaConsumerRecord implements KafkaConsumerRecord {
 
   private final long offset;
   private final long valueSizeInBytes;
+  private final String topic;
+  private final int partitionId;
   public static final long VALUE_SIZE_UNAVAILABLE = -1l;
 
   @Override
@@ -46,4 +48,14 @@ public abstract class BaseKafkaConsumerRecord implements KafkaConsumerRecord {
   public long getValueSizeInBytes() {
     return this.valueSizeInBytes;
   }
+
+  @Override
+  public int getPartition() {
+    return this.partitionId;
+  }
+
+  @Override
+  public String getTopic() {
+    return this.topic;
+  }
 }
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/BaseKafkaConsumerRecord.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinConsumerRebalanceListener.java
similarity index 56%
copy from gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/BaseKafkaConsumerRecord.java
copy to gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinConsumerRebalanceListener.java
index 017a166..334dcd4 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/BaseKafkaConsumerRecord.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinConsumerRebalanceListener.java
@@ -14,36 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.gobblin.kafka.client;
 
-import lombok.AllArgsConstructor;
-import lombok.EqualsAndHashCode;
-import lombok.ToString;
+import java.util.Collection;
+
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+
 
 /**
- * A base {@link KafkaConsumerRecord} that with offset and valueSizeInBytes
+ * A listener that is called when kafka partitions are re-assigned when a consumer leaves/joins a group
+ *
+ * For more details, See https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html
  */
-@AllArgsConstructor
-@EqualsAndHashCode
-@ToString
-public abstract class BaseKafkaConsumerRecord implements KafkaConsumerRecord {
-
-  private final long offset;
-  private final long valueSizeInBytes;
-  public static final long VALUE_SIZE_UNAVAILABLE = -1l;
-
-  @Override
-  public long getOffset() {
-    return this.offset;
-  }
-
-  @Override
-  public long getNextOffset() {
-    return this.offset + 1l;
-  }
-
-  @Override
-  public long getValueSizeInBytes() {
-    return this.valueSizeInBytes;
-  }
+
+public interface GobblinConsumerRebalanceListener {
+
+  void onPartitionsRevoked(Collection<KafkaPartition> partitions);
+
+  void onPartitionsAssigned(Collection<KafkaPartition> partitions);
+
 }
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
index 4026f33..684a48c 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
@@ -18,6 +18,7 @@ package org.apache.gobblin.kafka.client;
 
 import java.io.Closeable;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -94,7 +95,7 @@ public interface GobblinKafkaConsumerClient extends Closeable {
 
   /**
    * API to consume records from kakfa starting from <code>nextOffset</code> till <code>maxOffset</code>.
-   * If <code>maxOffset</code> is greater than <code>nextOffset</code>, returns a null.
+   * If <code>nextOffset</code> is greater than <code>maxOffset</code>, returns a null.
    * <code>nextOffset</code>
    * <p>
    *  <b>NOTE:</b> If the underlying kafka-client version does not support
@@ -110,6 +111,29 @@ public interface GobblinKafkaConsumerClient extends Closeable {
   public Iterator<KafkaConsumerRecord> consume(KafkaPartition partition, long nextOffset, long maxOffset);
 
   /**
+   * API to consume records from kakfa
+   * @return
+   */
+  default Iterator<KafkaConsumerRecord> consume() {
+    return Collections.emptyIterator();
+  }
+
+  /**
+   * Subscribe to a topic
+   * @param topic
+   */
+  default void subscribe(String topic) {
+    return;
+  }
+
+  /**
+   * Subscribe to a topic along with a GobblinKafkaRebalanceListener
+   * @param topic
+   */
+  default void subscribe(String topic, GobblinConsumerRebalanceListener listener) {
+    return;
+  }
+  /**
    * API to return underlying Kafka consumer metrics. The individual implementations must translate
    * org.apache.kafka.common.Metric to Coda Hale Metrics. A typical use case for reporting the consumer metrics
    * will call this method inside a scheduled thread.
@@ -120,6 +144,29 @@ public interface GobblinKafkaConsumerClient extends Closeable {
   }
 
   /**
+   * Commit offsets manually to Kafka asynchronously
+   */
+  default void commitOffsetsAsync(Map<KafkaPartition, Long> partitionOffsets) {
+    return;
+  }
+
+  /**
+   * Commit offsets manually to Kafka synchronously
+   */
+  default void commitOffsetsSync(Map<KafkaPartition, Long> partitionOffsets) {
+    return;
+  }
+
+  /**
+   * returns the last committed offset for a KafkaPartition
+   * @param partition
+   * @return last committed offset or -1 for invalid KafkaPartition
+   */
+  default long committed(KafkaPartition partition) {
+    return -1L;
+  }
+
+  /**
    * A factory to create {@link GobblinKafkaConsumerClient}s
    */
   public interface GobblinKafkaConsumerClientFactory {
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java
index 3279508..cba9fa2 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java
@@ -50,4 +50,23 @@ public interface KafkaConsumerRecord {
   public default boolean isTimestampLogAppend() {
     return false;
   }
+
+  default boolean isTimestampCreateTime() {
+    return false;
+  }
+
+  default boolean isTimestampNone() {
+    return false;
+  }
+
+  /**
+   * @return Partition id for this record
+   */
+  int getPartition();
+
+  /**
+   * @return topic for this record
+   */
+  String getTopic();
+
 }
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
index 1255e52..bb8681e 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
@@ -56,7 +56,7 @@ public class KafkaWriterConfigurationKeys {
   /** Kafka producer scoped configuration keys go here **/
   static final String KEY_SERIALIZER_CONFIG = "key.serializer";
   static final String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
-  static final String VALUE_SERIALIZER_CONFIG = "value.serializer";
+  public static final String VALUE_SERIALIZER_CONFIG = "value.serializer";
   static final String DEFAULT_VALUE_SERIALIZER = "org.apache.kafka.common.serialization.ByteArraySerializer";
   static final String CLIENT_ID_CONFIG = "client.id";
   static final String CLIENT_ID_DEFAULT = "gobblin";
@@ -68,7 +68,7 @@ public class KafkaWriterConfigurationKeys {
   public static final String CLUSTER_ZOOKEEPER = KAFKA_TOPIC_CONFIG + "zookeeper";
   static final String REPLICATION_COUNT = KAFKA_TOPIC_CONFIG + "replicationCount";
   static final int REPLICATION_COUNT_DEFAULT = 1;
-  static final String PARTITION_COUNT = KAFKA_TOPIC_CONFIG + "partitionCount";
+  public static final String PARTITION_COUNT = KAFKA_TOPIC_CONFIG + "partitionCount";
   static final int PARTITION_COUNT_DEFAULT = 1;
   public static final String ZOOKEEPER_SESSION_TIMEOUT = CLUSTER_ZOOKEEPER + ".sto";
   static final int ZOOKEEPER_SESSION_TIMEOUT_DEFAULT = 10000; // 10 seconds
diff --git a/gobblin-runtime/build.gradle b/gobblin-runtime/build.gradle
index b28db6b..0ff442a 100644
--- a/gobblin-runtime/build.gradle
+++ b/gobblin-runtime/build.gradle
@@ -79,7 +79,6 @@ dependencies {
   compile externalDependency.quartz
   compile externalDependency.slf4j
   compile externalDependency.typesafeConfig
-  compile externalDependency.kafka08
   compile externalDependency.guavaretrying
   compile externalDependency.hiveExec
   compile externalDependency.parquet
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
index 40e4e48..d9bc1aa 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
@@ -23,12 +23,14 @@ import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.Properties;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
 import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
 import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
 import org.apache.gobblin.runtime.api.GobblinInstanceDriver;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaAvroJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaAvroJobMonitor.java
index 527109d..14ec6cc 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaAvroJobMonitor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaAvroJobMonitor.java
@@ -32,11 +32,12 @@ import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
 
-import com.codahale.metrics.Counter;
 import com.codahale.metrics.Meter;
+import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
 import org.apache.gobblin.runtime.api.JobSpec;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
index 6ac9a68..0f47625 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
@@ -24,8 +24,11 @@ import java.util.Collection;
 import com.codahale.metrics.Counter;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
 import com.typesafe.config.Config;
 
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
 import org.apache.gobblin.metastore.DatasetStateStore;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.JobSpecMonitor;
@@ -35,7 +38,6 @@ import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.Either;
 
-import kafka.message.MessageAndMetadata;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -97,9 +99,9 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]>
   }
 
   @Override
-  protected void processMessage(MessageAndMetadata<byte[], byte[]> message) {
+  protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
     try {
-      Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.message());
+      Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.getValue());
       for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
         if (parsedMessage instanceof Either.Left) {
           this.newSpecs.inc();
@@ -113,8 +115,8 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]>
         }
       }
     } catch (IOException ioe) {
-      String messageStr = new String(message.message(), Charsets.UTF_8);
-      log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe);
+      String messageStr = new String(message.getValue(), Charsets.UTF_8);
+      log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.getOffset(), messageStr), ioe);
     }
   }
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index ae21921..4eb146b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -18,63 +18,76 @@
 package org.apache.gobblin.runtime.kafka;
 
 import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang3.reflect.ConstructorUtils;
 
 import com.codahale.metrics.Counter;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.kafka.client.GobblinConsumerRebalanceListener;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
+import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
 
 
 /**
- * A high level consumer for Kafka topics. Subclasses should implement {@link HighLevelConsumer#processMessage(MessageAndMetadata)}
+ * A high level consumer for Kafka topics. Subclasses should implement {@link HighLevelConsumer#processMessage(DecodeableKafkaRecord)}
  *
- * Note each thread will block for each message until {@link #processMessage(MessageAndMetadata)} returns, so for high
- * volume topics with few partitions {@link #processMessage(MessageAndMetadata)} must be fast or itself spawn more
- * threads.
+ * Note: each thread (queue) will block for each message until {@link #processMessage(DecodeableKafkaRecord)} returns
  *
- * If threads > partitions in topic, extra threads will be idle.
+ * If threads(queues) > partitions in topic, extra threads(queues) will be idle.
  *
- * @param <K> type of the key.
- * @param <V> type of the value.
  */
 @Slf4j
-public abstract class HighLevelConsumer<K, V> extends AbstractIdleService {
+public abstract class HighLevelConsumer<K,V> extends AbstractIdleService {
+
+  public static final String CONSUMER_CLIENT_FACTORY_CLASS_KEY = "kafka.consumerClientClassFactory";
+  private static final String DEFAULT_CONSUMER_CLIENT_FACTORY_CLASS =
+      "org.apache.gobblin.kafka.client.Kafka09ConsumerClient$Factory";
+  public static final String ENABLE_AUTO_COMMIT_KEY = "enable.auto.commit";
+  public static final boolean DEFAULT_AUTO_COMMIT_VALUE = false;
 
   public static final String GROUP_ID_KEY = "group.id";
   // NOTE: changing this will break stored offsets
   private static final String DEFAULT_GROUP_ID = "KafkaJobSpecMonitor";
+  public static final String OFFSET_COMMIT_NUM_RECORDS_THRESHOLD_KEY = "offsets.commit.num.records.threshold";
+  public static final int DEFAULT_OFFSET_COMMIT_NUM_RECORDS_THRESHOLD = 100;
+  public static final String OFFSET_COMMIT_TIME_THRESHOLD_SECS_KEY = "offsets.commit.time.threshold.secs";
+  public static final int DEFAULT_OFFSET_COMMIT_TIME_THRESHOLD_SECS = 10;
 
   @Getter
   protected final String topic;
   protected final Config config;
   private final int numThreads;
-  private final ConsumerConfig consumerConfig;
 
   /**
    * {@link MetricContext} for the consumer. Note this is instantiated when {@link #startUp()} is called, so
@@ -83,14 +96,71 @@ public abstract class HighLevelConsumer<K, V> extends AbstractIdleService {
   @Getter
   private MetricContext metricContext;
   private Counter messagesRead;
-  private ConsumerConnector consumer;
-  private ExecutorService executor;
+  @Getter
+  private final GobblinKafkaConsumerClient gobblinKafkaConsumerClient;
+  private final ScheduledExecutorService consumerExecutor;
+  private final ExecutorService queueExecutor;
+  private final BlockingQueue[] queues;
+  private final AtomicInteger recordsProcessed;
+  private final Map<KafkaPartition, Long> partitionOffsetsToCommit;
+  private final boolean enableAutoCommit;
+  private final int offsetsCommitNumRecordsThreshold;
+  private final int offsetsCommitTimeThresholdSecs;
+  private long lastCommitTime = System.currentTimeMillis();
+
+  private static final Config FALLBACK =
+      ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+          .put(GROUP_ID_KEY, DEFAULT_GROUP_ID)
+          .put(ENABLE_AUTO_COMMIT_KEY, DEFAULT_AUTO_COMMIT_VALUE)
+          .put(CONSUMER_CLIENT_FACTORY_CLASS_KEY, DEFAULT_CONSUMER_CLIENT_FACTORY_CLASS)
+          .put(OFFSET_COMMIT_NUM_RECORDS_THRESHOLD_KEY, DEFAULT_OFFSET_COMMIT_NUM_RECORDS_THRESHOLD)
+          .put(OFFSET_COMMIT_TIME_THRESHOLD_SECS_KEY, DEFAULT_OFFSET_COMMIT_TIME_THRESHOLD_SECS)
+          .build());
 
   public HighLevelConsumer(String topic, Config config, int numThreads) {
     this.topic = topic;
     this.numThreads = numThreads;
-    this.config = config;
-    this.consumerConfig = createConsumerConfig(config);
+    this.config = config.withFallback(FALLBACK);
+    this.gobblinKafkaConsumerClient = createConsumerClient(this.config);
+    // On Partition rebalance, commit exisiting offsets and reset.
+    this.gobblinKafkaConsumerClient.subscribe(this.topic, new GobblinConsumerRebalanceListener() {
+      @Override
+      public void onPartitionsRevoked(Collection<KafkaPartition> partitions) {
+        copyAndCommit();
+        partitionOffsetsToCommit.clear();
+      }
+
+      @Override
+      public void onPartitionsAssigned(Collection<KafkaPartition> partitions) {
+        // No op
+      }
+    });
+    this.consumerExecutor = Executors.newSingleThreadScheduledExecutor(ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("HighLevelConsumerThread")));
+    this.queueExecutor = Executors.newFixedThreadPool(this.numThreads, ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("QueueProcessor-%d")));
+    this.queues = new LinkedBlockingQueue[numThreads];
+    for(int i=0;i<queues.length;i++) {
+      this.queues[i] = new LinkedBlockingQueue();
+    }
+    this.recordsProcessed = new AtomicInteger(0);
+    this.partitionOffsetsToCommit = new ConcurrentHashMap<>();
+    this.enableAutoCommit = ConfigUtils.getBoolean(config, ENABLE_AUTO_COMMIT_KEY, DEFAULT_AUTO_COMMIT_VALUE);
+    this.offsetsCommitNumRecordsThreshold = ConfigUtils.getInt(config, OFFSET_COMMIT_NUM_RECORDS_THRESHOLD_KEY, DEFAULT_OFFSET_COMMIT_NUM_RECORDS_THRESHOLD);
+    this.offsetsCommitTimeThresholdSecs = ConfigUtils.getInt(config, OFFSET_COMMIT_TIME_THRESHOLD_SECS_KEY, DEFAULT_OFFSET_COMMIT_TIME_THRESHOLD_SECS);
+  }
+
+  protected GobblinKafkaConsumerClient createConsumerClient(Config config) {
+    String kafkaConsumerClientClass = config.getString(CONSUMER_CLIENT_FACTORY_CLASS_KEY);
+
+    try {
+      Class clientFactoryClass = Class.forName(kafkaConsumerClientClass);
+      final GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory factory =
+          (GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory)
+              ConstructorUtils.invokeConstructor(clientFactoryClass);
+
+      return factory.create(config);
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException("Failed to instantiate Kafka consumer client " + kafkaConsumerClientClass, e);
+    }
   }
 
   /**
@@ -125,81 +195,129 @@ public abstract class HighLevelConsumer<K, V> extends AbstractIdleService {
   protected List<Tag<?>> getTagsForMetrics() {
     List<Tag<?>> tags = Lists.newArrayList();
     tags.add(new Tag<>(RuntimeMetrics.TOPIC, this.topic));
-    tags.add(new Tag<>(RuntimeMetrics.GROUP_ID, this.consumerConfig.groupId()));
+    tags.add(new Tag<>(RuntimeMetrics.GROUP_ID, this.config.getString(GROUP_ID_KEY)));
     return tags;
   }
 
   /**
-   * Called every time a message is read from the stream. Implementation must be thread-safe if {@link #numThreads} is
+   * Called every time a message is read from the queue. Implementation must be thread-safe if {@link #numThreads} is
    * set larger than 1.
    */
-  protected abstract void processMessage(MessageAndMetadata<K, V> message);
+  protected abstract void processMessage(DecodeableKafkaRecord<K,V> message);
 
   @Override
   protected void startUp() {
     buildMetricsContextAndMetrics();
-    this.consumer = createConsumerConnector();
-
-    List<KafkaStream<byte[], byte[]>> streams = createStreams();
-    this.executor = Executors.newFixedThreadPool(this.numThreads);
+    // Method that starts threads that processes queues
+    processQueues();
+    // Main thread that constantly polls messages from kafka
+    consumerExecutor.scheduleAtFixedRate(new Runnable() {
+      @Override
+      public void run() {
+        consume();
+      }
+    }, 0, 50, TimeUnit.MILLISECONDS);
+  }
 
-    // now create an object to consume the messages
-    //
-    int threadNumber = 0;
-    for (final KafkaStream stream : streams) {
-      this.executor.execute(new MonitorConsumer(stream));
-      threadNumber++;
+  /**
+   * Consumes {@link KafkaConsumerRecord}s and adds to a queue
+   * Note: All records from a KafkaPartition are added to the same queue.
+   * A queue can contain records from multiple partitions if partitions > numThreads(queues)
+   */
+  private void consume() {
+    try {
+      Iterator<KafkaConsumerRecord> itr = gobblinKafkaConsumerClient.consume();
+      if(!enableAutoCommit) {
+        commitOffsets();
+      }
+      while (itr.hasNext()) {
+        KafkaConsumerRecord record = itr.next();
+        int idx = record.getPartition() % numThreads;
+        queues[idx].put(record);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
     }
   }
 
-  protected ConsumerConfig createConsumerConfig(Config config) {
-    Properties props = ConfigUtils.configToProperties(config);
+  /**
+   * Assigns a queue to each thread of the {@link #queueExecutor}
+   * Note: Assumption here is that {@link #numThreads} is same a number of queues
+   */
+  private void processQueues() {
+    for(BlockingQueue queue : queues) {
+      queueExecutor.execute(new QueueProcessor(queue));
+    }
+  }
 
-    if (!props.containsKey(GROUP_ID_KEY)) {
-      props.setProperty(GROUP_ID_KEY, DEFAULT_GROUP_ID);
+  /**
+   * Commits offsets to kafka
+   */
+  private void commitOffsets() {
+    if(shouldCommitOffsets()) {
+      copyAndCommit();
     }
-    return new ConsumerConfig(props);
   }
 
-  protected ConsumerConnector createConsumerConnector() {
-    return Consumer.createJavaConsumerConnector(this.consumerConfig);
+  @VisibleForTesting
+  protected void commitOffsets(Map<KafkaPartition, Long> partitionOffsets) {
+    this.gobblinKafkaConsumerClient.commitOffsetsAsync(partitionOffsets);
   }
 
-  protected List<KafkaStream<byte[], byte[]>> createStreams() {
-    Map<String, Integer> topicCountMap = Maps.newHashMap();
-    topicCountMap.put(this.topic, this.numThreads);
-    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = this.consumer.createMessageStreams(topicCountMap);
-    return consumerMap.get(this.topic);
+  private void copyAndCommit() {
+    Map<KafkaPartition, Long> copy = new HashMap<>(partitionOffsetsToCommit);
+    recordsProcessed.set(0);
+    lastCommitTime = System.currentTimeMillis();
+    commitOffsets(copy);
+  }
+
+  private boolean shouldCommitOffsets() {
+    return recordsProcessed.intValue() >= offsetsCommitNumRecordsThreshold || ((System.currentTimeMillis() - lastCommitTime) / 1000 >= offsetsCommitTimeThresholdSecs);
   }
 
   @Override
   public void shutDown() {
-    if (this.consumer != null) {
-      this.consumer.shutdown();
-    }
-    if (this.executor != null) {
-      ExecutorsUtils.shutdownExecutorService(this.executor, Optional.of(log), 5000, TimeUnit.MILLISECONDS);
-    }
+    ExecutorsUtils.shutdownExecutorService(this.consumerExecutor, Optional.of(log), 5000, TimeUnit.MILLISECONDS);
+    ExecutorsUtils.shutdownExecutorService(this.queueExecutor, Optional.of(log), 5000, TimeUnit.MILLISECONDS);
     try {
+      this.gobblinKafkaConsumerClient.close();
       this.shutdownMetrics();
-    } catch (IOException ioe) {
-      log.warn("Failed to shutdown metrics for " + this.getClass().getSimpleName());
+    } catch (IOException e) {
+      log.warn("Failed to shut down consumer client or metrics ", e);
     }
   }
 
   /**
-   * A monitor for a Kafka stream.
+   * Polls a {@link BlockingQueue} indefinitely for {@link KafkaConsumerRecord}
+   * Processes each record and maintains a count for #recordsProcessed
+   * Also records the latest offset for every {@link KafkaPartition} if auto commit is disabled
+   * Note: Messages in this queue will always be in order for every {@link KafkaPartition}
    */
-  @AllArgsConstructor
-  public class MonitorConsumer implements Runnable {
-    private final KafkaStream stream;
+  class QueueProcessor implements Runnable {
+    private final BlockingQueue<KafkaConsumerRecord> queue;
+
+    public QueueProcessor(BlockingQueue queue) {
+      this.queue = queue;
+    }
 
+    @Override
     public void run() {
-      ConsumerIterator<K, V> it = this.stream.iterator();
-      while (it.hasNext()) {
-        MessageAndMetadata<K, V> message = it.next();
-        HighLevelConsumer.this.messagesRead.inc();
-        HighLevelConsumer.this.processMessage(message);
+      log.info("Starting queue processing.. " + Thread.currentThread().getName());
+      try {
+        while(true) {
+          KafkaConsumerRecord record = queue.take();
+          messagesRead.inc();
+          HighLevelConsumer.this.processMessage((DecodeableKafkaRecord)record);
+          recordsProcessed.incrementAndGet();
+
+          if(!HighLevelConsumer.this.enableAutoCommit) {
+            KafkaPartition partition = new KafkaPartition.Builder().withId(record.getPartition()).withTopicName(HighLevelConsumer.this.topic).build();
+            // Committed offset should always be the offset of the next record to be read (hence +1)
+            partitionOffsetsToCommit.put(partition, record.getOffset() + 1);
+          }
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
       }
     }
   }
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java
deleted file mode 100644
index c825a12..0000000
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.runtime.job_monitor;
-
-import java.net.URI;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Optional;
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.runtime.kafka.HighLevelConsumerTest;
-
-
-public class KafkaJobMonitorTest {
-
-  @Test
-  public void test() throws Exception {
-
-    Config config = HighLevelConsumerTest.getSimpleConfig(Optional.of(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX));
-
-    MockedKafkaJobMonitor monitor = MockedKafkaJobMonitor.create(config);
-    monitor.startAsync();
-
-    monitor.getMockKafkaStream().pushToStream("job1:1");
-    monitor.awaitExactlyNSpecs(1);
-    Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job1")));
-    Assert.assertEquals(monitor.getJobSpecs().get(new URI("job1")).getVersion(), "1");
-
-    monitor.getMockKafkaStream().pushToStream("job2:1");
-    monitor.awaitExactlyNSpecs(2);
-    Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job2")));
-    Assert.assertEquals(monitor.getJobSpecs().get(new URI("job2")).getVersion(), "1");
-
-    monitor.getMockKafkaStream().pushToStream(MockedKafkaJobMonitor.REMOVE + ":job1");
-    monitor.awaitExactlyNSpecs(1);
-    Assert.assertFalse(monitor.getJobSpecs().containsKey(new URI("job1")));
-    Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job2")));
-
-    monitor.getMockKafkaStream().pushToStream("job2:2,job1:2");
-    monitor.awaitExactlyNSpecs(2);
-    Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job1")));
-    Assert.assertEquals(monitor.getJobSpecs().get(new URI("job1")).getVersion(), "2");
-    Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job2")));
-    Assert.assertEquals(monitor.getJobSpecs().get(new URI("job2")).getVersion(), "2");
-
-    monitor.shutDown();
-  }
-}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockKafkaStream.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockKafkaStream.java
deleted file mode 100644
index 7787519..0000000
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockKafkaStream.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.runtime.job_monitor;
-
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.mockito.Mockito;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
-
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.FetchedDataChunk;
-import kafka.consumer.KafkaStream;
-import kafka.consumer.PartitionTopicInfo;
-import kafka.consumer.ZookeeperConsumerConnector;
-import kafka.message.ByteBufferMessageSet;
-import kafka.message.Message;
-import kafka.message.NoCompressionCodec$;
-import kafka.serializer.DefaultDecoder;
-import kafka.utils.VerifiableProperties;
-import lombok.Getter;
-import scala.collection.JavaConversions;
-
-
-public class MockKafkaStream {
-
-  private final List<BlockingQueue<FetchedDataChunk>> queues;
-  @Getter
-  private final List<KafkaStream<byte[], byte[]>> mockStreams;
-  private final List<AtomicLong> offsets;
-  private final AtomicLong nextStream;
-
-  public MockKafkaStream(int numStreams) {
-
-    this.queues = Lists.newArrayList();
-    this.mockStreams = Lists.newArrayList();
-    this.offsets = Lists.newArrayList();
-
-    for (int i = 0; i < numStreams; i++) {
-      BlockingQueue<FetchedDataChunk> queue = Queues.newLinkedBlockingQueue();
-      this.queues.add(queue);
-      this.mockStreams.add(createMockStream(queue));
-      this.offsets.add(new AtomicLong(0));
-    }
-
-    this.nextStream = new AtomicLong(-1);
-  }
-
-  @SuppressWarnings("unchecked")
-  private static KafkaStream<byte[], byte[]> createMockStream(BlockingQueue<FetchedDataChunk> queue) {
-
-    KafkaStream<byte[], byte[]> stream = (KafkaStream<byte[], byte[]>) Mockito.mock(KafkaStream.class);
-    ConsumerIterator<byte[], byte[]> it =
-        new ConsumerIterator<>(queue, -1, new DefaultDecoder(new VerifiableProperties()), new DefaultDecoder(new VerifiableProperties()), "clientId");
-    Mockito.when(stream.iterator()).thenReturn(it);
-
-    return stream;
-  }
-
-  public void pushToStream(String message) {
-
-    int streamNo = (int) this.nextStream.incrementAndGet() % this.queues.size();
-
-    AtomicLong offset = this.offsets.get(streamNo);
-    BlockingQueue<FetchedDataChunk> queue = this.queues.get(streamNo);
-
-    AtomicLong thisOffset = new AtomicLong(offset.incrementAndGet());
-
-    List<Message> seq = Lists.newArrayList();
-    seq.add(new Message(message.getBytes(Charsets.UTF_8)));
-    ByteBufferMessageSet messageSet = new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, offset, JavaConversions.asScalaBuffer(seq));
-
-    FetchedDataChunk chunk = new FetchedDataChunk(messageSet,
-        new PartitionTopicInfo("topic", streamNo, queue, thisOffset, thisOffset, new AtomicInteger(1), "clientId"),
-        thisOffset.get());
-
-    queue.add(chunk);
-  }
-
-  public void shutdown() {
-    for (BlockingQueue<FetchedDataChunk> queue : this.queues) {
-      queue.add(ZookeeperConsumerConnector.shutdownCommand());
-    }
-  }
-
-}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
index 9e55236..0562534 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
@@ -31,6 +31,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
@@ -38,19 +39,18 @@ import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.MutableJobCatalog;
 import org.apache.gobblin.testing.AssertWithBackoff;
 import org.apache.gobblin.util.Either;
 
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 
 @Slf4j
-class MockedKafkaJobMonitor extends KafkaJobMonitor {
+public class MockedKafkaJobMonitor extends KafkaJobMonitor {
 
   private static final Splitter SPLITTER_COMMA = Splitter.on(",");
   private static final Splitter SPLITTER_COLON = Splitter.on(":");
@@ -58,18 +58,15 @@ class MockedKafkaJobMonitor extends KafkaJobMonitor {
 
   @Getter
   private final Map<URI, JobSpec> jobSpecs;
-  @Getter
-  private final MockKafkaStream mockKafkaStream;
 
-  public static MockedKafkaJobMonitor create(Config config) {
-    return new MockedKafkaJobMonitor(config, Maps.<URI, JobSpec>newConcurrentMap());
+  public static MockedKafkaJobMonitor create(String topic, Config config) {
+    return new MockedKafkaJobMonitor(topic, config, Maps.<URI, JobSpec>newConcurrentMap());
   }
 
-  private MockedKafkaJobMonitor(Config config, Map<URI, JobSpec> jobSpecs) {
-    super("topic", createMockCatalog(jobSpecs), config);
+  private MockedKafkaJobMonitor(String topic, Config config, Map<URI, JobSpec> jobSpecs) {
+    super(topic, createMockCatalog(jobSpecs), config);
 
     this.jobSpecs = jobSpecs;
-    this.mockKafkaStream = new MockKafkaStream(1);
   }
 
   private static MutableJobCatalog createMockCatalog(final Map<URI, JobSpec> jobSpecs) {
@@ -127,18 +124,7 @@ class MockedKafkaJobMonitor extends KafkaJobMonitor {
   }
 
   @Override
-  protected List<KafkaStream<byte[], byte[]>> createStreams() {
-    return this.mockKafkaStream.getMockStreams();
-  }
-
-  @Override
-  protected ConsumerConnector createConsumerConnector() {
-    return Mockito.mock(ConsumerConnector.class);
-  }
-
-  @Override
   public void shutDown() {
-    this.mockKafkaStream.shutdown();
     super.shutDown();
   }
 
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/HighLevelConsumerTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/HighLevelConsumerTest.java
deleted file mode 100644
index e8d4e6c..0000000
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/HighLevelConsumerTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.runtime.kafka;
-
-import java.io.File;
-import java.util.Properties;
-import java.util.concurrent.TimeoutException;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Optional;
-import com.google.common.io.Files;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.job_monitor.MockKafkaStream;
-
-
-public class HighLevelConsumerTest {
-
-  public static Config getSimpleConfig(Optional<String> prefix) {
-    Properties properties = new Properties();
-    properties.put(getConfigKey(prefix, "zookeeper.connect"), "zookeeper");
-    properties.put(ConfigurationKeys.STATE_STORE_ENABLED, "true");
-    File tmpDir = Files.createTempDir();
-    tmpDir.deleteOnExit();
-    properties.put(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, tmpDir.toString());
-
-    return ConfigFactory.parseProperties(properties);
-  }
-
-  private static String getConfigKey(Optional<String> prefix, String key) {
-    return prefix.isPresent() ? prefix.get() + "." + key : key;
-  }
-
-  @Test
-  public void test() throws Exception {
-
-    MockKafkaStream mockKafkaStream = new MockKafkaStream(5);
-    MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(getSimpleConfig(Optional.<String>absent()), 5, mockKafkaStream);
-
-    consumer.startAsync();
-    consumer.awaitRunning();
-
-    Assert.assertTrue(consumer.getMessages().isEmpty());
-
-    mockKafkaStream.pushToStream("message");
-
-    consumer.awaitAtLeastNMessages(1);
-    Assert.assertEquals(consumer.getMessages().get(0), "message");
-
-    mockKafkaStream.pushToStream("message2");
-    consumer.awaitAtLeastNMessages(2);
-    Assert.assertEquals(consumer.getMessages().get(1), "message2");
-
-    consumer.shutDown();
-    mockKafkaStream.pushToStream("message3");
-    try {
-      consumer.awaitAtLeastNMessages(3);
-      Assert.fail();
-    } catch (TimeoutException ie) {
-      // should throw this
-    }
-  }
-}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/MockedHighLevelConsumer.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/MockedHighLevelConsumer.java
index 5035afa..520b354 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/MockedHighLevelConsumer.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/MockedHighLevelConsumer.java
@@ -18,66 +18,59 @@
 package org.apache.gobblin.runtime.kafka;
 
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
-import org.mockito.Mockito;
-
-import com.google.common.base.Charsets;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 
-import org.apache.gobblin.runtime.job_monitor.MockKafkaStream;
-import org.apache.gobblin.testing.AssertWithBackoff;
-
 import javax.annotation.Nullable;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.testing.AssertWithBackoff;
 
 @Slf4j
-class MockedHighLevelConsumer extends HighLevelConsumer<byte[], byte[]> {
-  private final MockKafkaStream mockKafkaStream;
-  @Getter
-  private final List<String> messages;
+public class MockedHighLevelConsumer extends HighLevelConsumer<byte[], byte[]> {
 
-  public MockedHighLevelConsumer(Config config, int numThreads, MockKafkaStream stream) {
-    super("topic", config, numThreads);
+  @Getter
+  private final List<byte[]> messages;
+  @Getter
+  private final Map<KafkaPartition, Long> committedOffsets;
 
-    this.mockKafkaStream = stream;
+  public MockedHighLevelConsumer(String topic, Config config, int numThreads) {
+    super(topic, config, numThreads);
     this.messages = Lists.newArrayList();
+    this.committedOffsets = new ConcurrentHashMap<>();
   }
 
-  public void awaitAtLeastNMessages(final int n) throws Exception {
+  public void awaitExactlyNMessages(final int n, int timeoutMillis) throws Exception {
     AssertWithBackoff.assertTrue(new Predicate<Void>() {
       @Override
       public boolean apply(@Nullable Void input) {
-        return MockedHighLevelConsumer.this.messages.size() >= n;
+        return MockedHighLevelConsumer.this.messages.size() == n;
       }
-    }, 1000, n + " messages", log, 2, 1000);
-  }
-
-  @Override
-  protected void processMessage(MessageAndMetadata<byte[], byte[]> message) {
-    this.messages.add(new String(message.message(), Charsets.UTF_8));
+    }, timeoutMillis, n + " messages", log, 2, 1000);
   }
 
   @Override
-  protected List<KafkaStream<byte[], byte[]>> createStreams() {
-    return this.mockKafkaStream.getMockStreams();
+  protected void processMessage(DecodeableKafkaRecord<byte[], byte[]> message) {
+    this.messages.add(message.getValue());
   }
 
   @Override
-  protected ConsumerConnector createConsumerConnector() {
-    return Mockito.mock(ConsumerConnector.class);
+  protected void commitOffsets(Map<KafkaPartition, Long> partitionOffsets) {
+    super.commitOffsets(partitionOffsets);
+    committedOffsets.putAll(partitionOffsets.entrySet().stream().collect(Collectors
+        .toMap(e -> e.getKey(), e -> e.getValue())));
   }
 
   @Override
   public void shutDown() {
-    this.mockKafkaStream.shutdown();
     super.shutDown();
   }
 }
diff --git a/gobblin-service/build.gradle b/gobblin-service/build.gradle
index bafa6d1..dee31a3 100644
--- a/gobblin-service/build.gradle
+++ b/gobblin-service/build.gradle
@@ -60,7 +60,6 @@ dependencies {
   compile externalDependency.jgit
   compile externalDependency.jodaTime
   compile externalDependency.jgrapht
-  compile externalDependency.kafka08
   compile externalDependency.log4j
   compile externalDependency.lombok
   compile externalDependency.metricsCore
@@ -70,12 +69,12 @@ dependencies {
   compile externalDependency.slf4j
   compile externalDependency.typesafeConfig
   compile externalDependency.zkClient
+  compile externalDependency.joptSimple
 
   testCompile project(":gobblin-example")
 
   // Required for adding Test class into classpath
   testCompile project(":gobblin-runtime").sourceSets.test.output
-  testCompile project(path: ":gobblin-modules:gobblin-kafka-08:", configuration: "tests")
   testCompile project(path: ":gobblin-metastore", configuration: "testFixtures")
   testCompile project(":gobblin-test-utils")
   testCompile externalDependency.byteman
@@ -88,7 +87,6 @@ dependencies {
   testCompile externalDependency.hamcrest
   testCompile externalDependency.jhyde
   testCompile externalDependency.mockito
-  testCompile externalDependency.kafka08Test
 }
 
 // Begin HACK to get around POM being depenendent on the (empty) gobblin-rest-api instead of gobblin-rest-api-rest-client
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index be0947d..279c769 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -33,11 +33,11 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
 import avro.shaded.com.google.common.annotations.VisibleForTesting;
-import kafka.message.MessageAndMetadata;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
 import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
 import org.apache.gobblin.metastore.StateStore;
@@ -114,15 +114,15 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
   }
 
   @Override
-  protected void processMessage(MessageAndMetadata<byte[],byte[]> message) {
+  protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
     try {
-      org.apache.gobblin.configuration.State jobStatus = parseJobStatus(message.message());
+      org.apache.gobblin.configuration.State jobStatus = parseJobStatus(message.getValue());
       if (jobStatus != null) {
         addJobStatusToStateStore(jobStatus, this.stateStore);
       }
     } catch (IOException ioe) {
-      String messageStr = new String(message.message(), Charsets.UTF_8);
-      log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe);
+      String messageStr = new String(message.getValue(), Charsets.UTF_8);
+      log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.getOffset(), messageStr), ioe);
     }
   }