You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/12/04 05:20:41 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1259] Implement a Kafka streaming extractor

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 44d7aa0  [GOBBLIN-1259] Implement a Kafka streaming extractor
44d7aa0 is described below

commit 44d7aa00b1a8ecf50974d1004f8ff18677eeffed
Author: Sudarshan Vasudevan <sv...@users.noreply.github.com>
AuthorDate: Thu Dec 3 21:20:34 2020 -0800

    [GOBBLIN-1259] Implement a Kafka streaming extractor
    
    Closes #3102 from sv2000/kafkaStreamingExtractor
---
 .../runtime/CheckpointableWatermarkState.java      |   2 +-
 .../apache/gobblin/stream/FlushRecordEnvelope.java |  23 +
 gobblin-core-base/build.gradle                     |   1 +
 .../extractor/extract/FlushingExtractor.java       | 367 ++++++++++++++
 .../runtime/StateStoreBasedWatermarkStorage.java   |   0
 .../apache/gobblin/metrics/MetricContextUtils.java |  40 ++
 .../couchbase/writer/CouchbaseWriterBuilder.java   |   4 +-
 gobblin-modules/gobblin-kafka-common/build.gradle  |   2 +-
 .../kafka/client/GobblinKafkaConsumerClient.java   |   3 +
 .../extractor/extract/kafka/KafkaPartition.java    |   2 +-
 .../extract/kafka/KafkaProduceRateTracker.java     | 311 ++++++++++++
 .../extract/kafka/KafkaStreamingExtractor.java     | 554 +++++++++++++++++++++
 .../packer/KafkaTopicGroupingWorkUnitPacker.java   | 350 +++++++++++++
 ...roduceRateAndLagBasedWorkUnitSizeEstimator.java | 150 ++++++
 .../packer/UnitKafkaWorkUnitSizeEstimator.java     |  36 ++
 .../gobblin/metrics/kafka/LoggingPusherTest.java   |   2 +
 .../extract/kafka/KafkaExtractorUtils.java         |  93 ++++
 .../extract/kafka/KafkaProduceRateTrackerTest.java | 220 ++++++++
 .../extract/kafka/KafkaStreamTestUtils.java        | 314 ++++++++++++
 .../extract/kafka/KafkaStreamingExtractorTest.java | 134 +++++
 .../KafkaTopicGroupingWorkUnitPackerTest.java      | 154 ++++++
 ...ceRateAndLagBasedWorkUnitSizeEstimatorTest.java |  94 ++++
 gobblin-runtime/build.gradle                       |   1 +
 gradle/scripts/dependencyDefinitions.gradle        |  15 +-
 24 files changed, 2860 insertions(+), 12 deletions(-)

diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CheckpointableWatermarkState.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/CheckpointableWatermarkState.java
similarity index 96%
rename from gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CheckpointableWatermarkState.java
rename to gobblin-api/src/main/java/org/apache/gobblin/runtime/CheckpointableWatermarkState.java
index 791f1b3..08dc5fa 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CheckpointableWatermarkState.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/CheckpointableWatermarkState.java
@@ -25,7 +25,7 @@ import org.apache.gobblin.source.extractor.CheckpointableWatermark;
 
 /**
  * Making {@link CheckpointableWatermark} look like {@link State} so it can be
- * stored in a {@link org.apache.gobblin.metastore.StateStore}.
+ * stored in a Gobblin state store.
  */
 public class CheckpointableWatermarkState extends State {
 
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/stream/FlushRecordEnvelope.java b/gobblin-api/src/main/java/org/apache/gobblin/stream/FlushRecordEnvelope.java
new file mode 100644
index 0000000..bc73965
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/stream/FlushRecordEnvelope.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.stream;
+
+public class FlushRecordEnvelope extends RecordEnvelope {
+  public FlushRecordEnvelope() {
+    super(null);
+  }
+}
diff --git a/gobblin-core-base/build.gradle b/gobblin-core-base/build.gradle
index 4c9b667..fb7a4a7 100644
--- a/gobblin-core-base/build.gradle
+++ b/gobblin-core-base/build.gradle
@@ -21,6 +21,7 @@ apply plugin: 'me.champeau.gradle.jmh'
 
 dependencies {
   compile project(":gobblin-api")
+  compile project(":gobblin-metastore")
   compile project(":gobblin-utility")
   compile project(":gobblin-metrics-libs:gobblin-metrics")
   compile project(":gobblin-modules:gobblin-codecs")
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/FlushingExtractor.java b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/FlushingExtractor.java
new file mode 100644
index 0000000..9391a30
--- /dev/null
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/FlushingExtractor.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.ack.Ackable;
+import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metrics.MetricContextUtils;
+import org.apache.gobblin.publisher.DataPublisher;
+import org.apache.gobblin.runtime.StateStoreBasedWatermarkStorage;
+import org.apache.gobblin.source.extractor.CheckpointableWatermark;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.stream.FlushControlMessage;
+import org.apache.gobblin.stream.FlushRecordEnvelope;
+import org.apache.gobblin.stream.RecordEnvelope;
+import org.apache.gobblin.stream.StreamEntity;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.gobblin.writer.LastWatermarkTracker;
+import org.apache.gobblin.writer.WatermarkStorage;
+import org.apache.gobblin.writer.WatermarkTracker;
+
+
+/**
+ * An abstract class that implements a {@link EventBasedExtractor}. The {@link FlushingExtractor} injects a
+ * {@link FlushControlMessage} at a frequency determined by {@value #FLUSH_INTERVAL_SECONDS_KEY}.
+ * The {@link FlushControlMessage} blocks further messages being pushed until the outstanding {@link FlushControlMessage}
+ * has been acked by the real writer. On a successful ack, the extractor invokes a publish on the underlying DataPublisher,
+ * which moves the data from the task output location to the final publish location. After a successful publish, the FlushingExtractor
+ * commits the watermarks to the {@link WatermarkStorage}. Note the watermark committed to the watermark storage is the
+ * last successfully acked Watermark. Individual extractor implementations should start seeking from the next watermark
+ * past this watermark. For example, in the case of a Kafka extractor, the consumer should seek to the offset that is
+ * one more than the last committed watermark.
+ *
+ * Individual extractor implementations that extend the FlushingExtractor need to implement the following method:
+ * <ul>
+ *   <li>readRecordEnvelopeImpl() - to return the next {@link RecordEnvelope}</li>.
+ * </ul>
+ *
+ * The FlushingExtractor allows applications to plug-in pre and post {@link CommitStep}s as actions to be performed before and after
+ * each commit.
+ * @param <D> type of {@link RecordEnvelope}
+ */
+
+@Slf4j
+public abstract class FlushingExtractor<S, D> extends EventBasedExtractor<S, D> {
+  public static final String GOBBLIN_EXTRACTOR_PRECOMMIT_STEPS = "gobblin.extractor.precommit.steps";
+  public static final String GOBBLIN_EXTRACTOR_POSTCOMMIT_STEPS = "gobblin.extractor.postcommit.steps";
+  public static final String FLUSH_INTERVAL_SECONDS_KEY = "stream.flush.interval.secs";
+  public static final Long DEFAULT_FLUSH_INTERVAL_SECONDS = 60L;
+
+  public static final String FLUSH_DATA_PUBLISHER_CLASS = "flush.data.publisher.class";
+  public static final String DEFAULT_FLUSH_DATA_PUBLISHER_CLASS = "org.apache.gobblin.publisher.BaseDataPublisher";
+
+  public static final String WATERMARK_COMMIT_TIME_METRIC = "state.store.metrics.watermarkCommitTime";
+  public static final String COMMIT_STEP_METRIC_PREFIX = "commit.step.";
+
+  @Getter
+  protected Map<String, CheckpointableWatermark> lastCommittedWatermarks;
+
+  private final List<String> preCommitSteps;
+  private final List<String> postCommitSteps;
+
+  private final Map<String, CommitStep> commitStepMap = Maps.newHashMap();
+
+  private final AtomicLong watermarkCommitTime = new AtomicLong(0L);
+  private final List<AtomicLong> preCommitStepTimes = Lists.newArrayList();
+  private final List<AtomicLong> postCommitStepTimes = Lists.newArrayList();
+
+  protected Config config;
+  @Setter
+  private Optional<WatermarkStorage> watermarkStorage;
+  @Getter
+  protected WatermarkTracker watermarkTracker;
+  protected Long flushIntervalMillis;
+
+  protected Long timeOfLastFlush = System.currentTimeMillis();
+  private FlushAckable lastFlushAckable;
+  private boolean hasOutstandingFlush = false;
+  private Optional<DataPublisher> flushPublisher = Optional.absent();
+  protected WorkUnitState workUnitState;
+
+  public FlushingExtractor(WorkUnitState state) {
+    super(state);
+    this.workUnitState = state;
+    this.config = ConfigFactory.parseProperties(state.getProperties());
+    this.flushIntervalMillis =
+        ConfigUtils.getLong(config, FLUSH_INTERVAL_SECONDS_KEY, DEFAULT_FLUSH_INTERVAL_SECONDS) * 1000;
+    this.watermarkTracker = new LastWatermarkTracker(false);
+    this.watermarkStorage = Optional.of(new StateStoreBasedWatermarkStorage(state));
+    this.preCommitSteps = ConfigUtils.getStringList(config, GOBBLIN_EXTRACTOR_PRECOMMIT_STEPS);
+    this.postCommitSteps = ConfigUtils.getStringList(config, GOBBLIN_EXTRACTOR_POSTCOMMIT_STEPS);
+    preCommitSteps.stream().map(commitStep -> new AtomicLong(0L)).forEach(this.preCommitStepTimes::add);
+    postCommitSteps.stream().map(commitStep -> new AtomicLong(0L)).forEach(this.postCommitStepTimes::add);
+
+    initFlushPublisher();
+    MetricContextUtils.registerGauge(this.getMetricContext(), WATERMARK_COMMIT_TIME_METRIC, this.watermarkCommitTime);
+    initCommitStepMetrics(this.preCommitSteps, this.postCommitSteps);
+  }
+
+  private void initCommitStepMetrics(List<String>... commitStepLists) {
+    for (List<String> commitSteps : commitStepLists) {
+      for (String commitStepAlias : commitSteps) {
+        String metricName = COMMIT_STEP_METRIC_PREFIX + commitStepAlias + ".time";
+        MetricContextUtils.registerGauge(this.getMetricContext(), metricName, new AtomicLong(0L));
+      }
+    }
+  }
+
+  private StreamEntity<D> generateFlushMessageIfNecessary() {
+    Long currentTime = System.currentTimeMillis();
+    if ((currentTime - timeOfLastFlush) > this.flushIntervalMillis) {
+      return generateFlushMessage(currentTime);
+    }
+    return null;
+  }
+
+  private StreamEntity<D> generateFlushMessage(Long currentTime) {
+    log.debug("Injecting flush control message");
+    FlushControlMessage<D> flushMessage = FlushControlMessage.<D>builder().flushReason("Timed flush").build();
+    FlushAckable flushAckable = new FlushAckable();
+    // add a flush ackable to wait for the flush to complete before returning from this flush call
+    flushMessage.addCallBack(flushAckable);
+
+    //Preserve the latest flushAckable.
+    this.lastFlushAckable = flushAckable;
+    this.hasOutstandingFlush = true;
+    timeOfLastFlush = currentTime;
+    return flushMessage;
+  }
+
+  /**
+   * Create an {@link DataPublisher} for publishing after a flush. The {@link DataPublisher} is created through a
+   * DataPublisherFactory which makes requests
+   * to a {@link org.apache.gobblin.broker.iface.SharedResourcesBroker} to support sharing
+   * {@link DataPublisher} instances when appropriate.
+   * @return the {@link DataPublisher}
+   */
+  private void initFlushPublisher() {
+    if (this.flushPublisher.isPresent()) {
+      return;
+    }
+    String publisherClassName =
+        ConfigUtils.getString(this.config, FLUSH_DATA_PUBLISHER_CLASS, DEFAULT_FLUSH_DATA_PUBLISHER_CLASS);
+
+    try {
+      this.flushPublisher = (Optional<DataPublisher>) Optional.of(
+          GobblinConstructorUtils.invokeLongestConstructor(Class.forName(publisherClassName), this.workUnitState));
+    } catch (ReflectiveOperationException e) {
+      log.error("Error in instantiating Data Publisher");
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public StreamEntity<D> readStreamEntityImpl() throws DataRecordException, IOException {
+    //Block until an outstanding flush has been Ack-ed.
+    if (this.hasOutstandingFlush) {
+      Throwable error = this.lastFlushAckable.waitForAck();
+      if (error != null) {
+        throw new RuntimeException("Error waiting for flush ack", error);
+      }
+
+      //Reset outstandingFlush flag
+      this.hasOutstandingFlush = false;
+
+      //Run pre-commit steps
+      doCommitSequence(preCommitSteps, true);
+
+      //Publish task output to final publish location.
+      publishTaskOutput();
+
+      //Provide a callback to the underlying extractor to handle logic for flush ack.
+      onFlushAck();
+
+      //Run post-commit steps
+      doCommitSequence(postCommitSteps, false);
+    }
+
+    StreamEntity<D> entity = generateFlushMessageIfNecessary();
+    if (entity != null) {
+      return entity;
+    }
+
+    //return the next read record.
+    RecordEnvelope<D> recordEnvelope = readRecordEnvelopeImpl();
+    if (recordEnvelope instanceof FlushRecordEnvelope) {
+      StreamEntity<D> flushMessage = generateFlushMessage(System.currentTimeMillis());
+      return flushMessage;
+    }
+    if (recordEnvelope != null) {
+      this.watermarkTracker.unacknowledgedWatermark(recordEnvelope.getWatermark());
+    }
+
+    return recordEnvelope;
+  }
+
+  /**
+   * A method that instantiates a {@link CommitStep} given an alias.
+   * @param commitStepAlias alias or fully qualified class name of the {@link CommitStep}.
+   * @throws IOException
+   */
+  public CommitStep initCommitStep(String commitStepAlias, boolean isPrecommit) throws IOException {
+    return null;
+  }
+
+  private void doCommitSequence(List<String> commitSteps, boolean isPrecommit) throws IOException {
+    for (int i = 0; i < commitSteps.size(); i++) {
+      long startTimeMillis = System.currentTimeMillis();
+      String commitStepAlias = commitSteps.get(i);
+      CommitStep commitStep = commitStepMap.get(commitStepAlias);
+      if (commitStep == null) {
+        commitStep = initCommitStep(commitSteps.get(i), isPrecommit);
+        commitStepMap.put(commitStepAlias, commitStep);
+      }
+      log.info("Calling commit step: {}", commitStepAlias);
+      commitStep.execute();
+      long commitStepTime = System.currentTimeMillis() - startTimeMillis;
+      if (isPrecommit) {
+        preCommitStepTimes.get(i).set(commitStepTime);
+      } else {
+        postCommitStepTimes.get(i).set(commitStepTime);
+      }
+    }
+  }
+
+  /**
+   * A callback for the underlying extractor to implement logic for handling the completion of a flush. Underlying
+   * Extractor can override this method
+   */
+  protected void onFlushAck() throws IOException {
+    checkPointWatermarks();
+  }
+
+  /**
+   * A method that returns the latest committed watermarks back to the caller. This method will be typically called
+   * by the underlying extractor during the initialization phase to retrieve the latest watermarks.
+   * @param checkPointableWatermarkClass a {@link CheckpointableWatermark} class
+   * @param partitions a collection of partitions assigned to the extractor
+   * @return the latest committed watermarks as a map of (source, watermark) pairs. For example, in the case of a KafkaStreamingExtractor,
+   * this map would be a collection of (TopicPartition, KafkaOffset) pairs.
+   */
+  public Map<String, CheckpointableWatermark> getCommittedWatermarks(Class checkPointableWatermarkClass,
+      Iterable<String> partitions) {
+    Preconditions.checkArgument(CheckpointableWatermark.class.isAssignableFrom(checkPointableWatermarkClass),
+        "Watermark class " + checkPointableWatermarkClass.toString() + " is not a CheckPointableWatermark class");
+    try {
+      this.lastCommittedWatermarks =
+          this.watermarkStorage.get().getCommittedWatermarks(checkPointableWatermarkClass, partitions);
+    } catch (Exception e) {
+      // failed to get watermarks ... log a warning message
+      log.warn("Failed to get watermarks... will start from the beginning", e);
+      this.lastCommittedWatermarks = Collections.EMPTY_MAP;
+    }
+    return this.lastCommittedWatermarks;
+  }
+
+  /**
+   * Publish task output to final publish location.
+   */
+  protected void publishTaskOutput() throws IOException {
+    if (!this.flushPublisher.isPresent()) {
+      throw new IOException("Publish called without a flush publisher");
+    }
+    this.flushPublisher.get().publish(Collections.singletonList(workUnitState));
+  }
+
+  /**
+   * Persist the watermarks in {@link WatermarkTracker#unacknowledgedWatermarks(Map)} to {@link WatermarkStorage}.
+   * The method is called when after a {@link FlushControlMessage} has been acknowledged. To make retrieval of
+   * the last committed watermarks efficient, this method caches the watermarks present in the unacknowledged watermark
+   * map.
+   *
+   * @throws IOException
+   */
+  private void checkPointWatermarks() throws IOException {
+    Map<String, CheckpointableWatermark> unacknowledgedWatermarks =
+        this.watermarkTracker.getAllUnacknowledgedWatermarks();
+    if (this.watermarkStorage.isPresent()) {
+      long commitBeginTime = System.currentTimeMillis();
+      this.watermarkStorage.get().commitWatermarks(unacknowledgedWatermarks.values());
+      this.watermarkCommitTime.set(System.currentTimeMillis() - commitBeginTime);
+      //Cache the last committed watermarks
+      for (Map.Entry<String, CheckpointableWatermark> entry : unacknowledgedWatermarks.entrySet()) {
+        this.lastCommittedWatermarks.put(entry.getKey(), entry.getValue());
+      }
+    } else {
+      log.warn("No watermarkStorage found; Skipping checkpointing");
+    }
+  }
+
+  /**
+   * A method to be implemented by the underlying extractor that returns the next record as an instance of
+   * {@link RecordEnvelope}
+   * @return the next {@link RecordEnvelope} instance read from the source
+   */
+  public abstract RecordEnvelope<D> readRecordEnvelopeImpl() throws DataRecordException, IOException;
+
+  /**
+   * {@link Ackable} for waiting for the flush control message to be processed
+   */
+  private static class FlushAckable implements Ackable {
+    private Throwable error;
+    private final CountDownLatch processed;
+
+    public FlushAckable() {
+      this.processed = new CountDownLatch(1);
+    }
+
+    @Override
+    public void ack() {
+      this.processed.countDown();
+    }
+
+    @Override
+    public void nack(Throwable error) {
+      this.error = error;
+      this.processed.countDown();
+    }
+
+    /**
+     * Wait for ack
+     * @return any error encountered
+     */
+    public Throwable waitForAck() {
+      try {
+        this.processed.await();
+        return this.error;
+      } catch (InterruptedException e) {
+        throw new RuntimeException("interrupted while waiting for ack");
+      }
+    }
+  }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java b/gobblin-metastore/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
similarity index 100%
rename from gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
rename to gobblin-metastore/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContextUtils.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContextUtils.java
new file mode 100644
index 0000000..41c2bf9
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContextUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics;
+
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.codahale.metrics.Gauge;
+import com.google.common.util.concurrent.AtomicDouble;
+
+
+public class MetricContextUtils {
+
+  private MetricContextUtils() {
+  }
+  public static void registerGauge(MetricContext metricContext, String metricName, AtomicLong atomicLong) {
+    Gauge<Long> gauge = metricContext.newContextAwareGauge(metricName, () -> atomicLong.get());
+    metricContext.register(metricName, gauge);
+  }
+
+  public static void registerGauge(MetricContext metricContext, String metricName, AtomicDouble atomicDouble) {
+    Gauge<Double> gauge = metricContext.newContextAwareGauge(metricName, () -> atomicDouble.get());
+    metricContext.register(metricName, gauge);
+  }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseWriterBuilder.java b/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseWriterBuilder.java
index de155da..d961ddb 100644
--- a/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseWriterBuilder.java
+++ b/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseWriterBuilder.java
@@ -18,10 +18,10 @@
 package org.apache.gobblin.couchbase.writer;
 
 import com.couchbase.client.java.env.CouchbaseEnvironment;
+import com.google.common.base.Preconditions;
 import com.typesafe.config.Config;
 import java.io.IOException;
 import java.util.Properties;
-import junit.framework.Assert;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.writer.AsyncDataWriter;
@@ -34,7 +34,7 @@ import org.apache.log4j.Logger;
 public class CouchbaseWriterBuilder extends DataWriterBuilder {
   private static final Logger LOG = Logger.getLogger(CouchbaseWriterBuilder.class);
   public DataWriter build(Config config) throws IOException {
-    Assert.assertNotNull("Config cannot be null", config);
+    Preconditions.checkArgument(config != null, "Config cannot be null");
     config.entrySet().stream().forEach(x -> String.format("Config passed to factory builder '%s':'%s'", x.getKey(), x.getValue().toString()));
     CouchbaseEnvironment couchbaseEnvironment = CouchbaseEnvironmentFactory.getInstance(config);
 
diff --git a/gobblin-modules/gobblin-kafka-common/build.gradle b/gobblin-modules/gobblin-kafka-common/build.gradle
index d92774b..8cd83a4 100644
--- a/gobblin-modules/gobblin-kafka-common/build.gradle
+++ b/gobblin-modules/gobblin-kafka-common/build.gradle
@@ -40,7 +40,7 @@ dependencies {
   compile externalDependency.slf4j
   compile externalDependency.typesafeConfig
   compile externalDependency.findBugsAnnotations
-
+  compile externalDependency.jollyday
 
   testCompile project(":gobblin-test-utils")
   testCompile externalDependency.mockito
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
index 684a48c..c75408e 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
@@ -28,6 +28,7 @@ import com.codahale.metrics.Metric;
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
@@ -166,6 +167,8 @@ public interface GobblinKafkaConsumerClient extends Closeable {
     return -1L;
   }
 
+  public default void assignAndSeek(List<KafkaPartition> topicPartitions, Map<KafkaPartition, LongWatermark> topicWatermarksMap) { return; }
+
   /**
    * A factory to create {@link GobblinKafkaConsumerClient}s
    */
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaPartition.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaPartition.java
index 29cea5b..df00618 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaPartition.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaPartition.java
@@ -99,7 +99,7 @@ public final class KafkaPartition {
 
   @Override
   public String toString() {
-    return this.getTopicName() + ":" + this.getId();
+    return this.getTopicName() + "-" + this.getId();
   }
 
   @Override
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTracker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTracker.java
new file mode 100644
index 0000000..b3e67e6
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTracker.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.joda.time.DateTimeZone;
+import org.joda.time.LocalDate;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.EvictingQueue;
+import com.google.common.collect.Maps;
+
+import de.jollyday.HolidayManager;
+import de.jollyday.parameter.UrlManagerParameter;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.extractor.CheckpointableWatermark;
+import org.apache.gobblin.source.extractor.extract.FlushingExtractor;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
+import org.apache.gobblin.writer.WatermarkTracker;
+
+/**
+ * A helper class that tracks the produce rate for each TopicPartition currently consumed by the {@link KafkaStreamingExtractor}.
+ * The produce rates are stored in the {@link org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor.KafkaWatermark}
+ * and checkpointed to the {@link org.apache.gobblin.writer.WatermarkStorage} along with the watermarks.
+ *
+ * The produce rates are maintained based on hour-of-day and day-of-week, and are computed in bytes/sec. The new produce
+ * rates estimates are obtained as Exponentially Weighted Moving Average (EWMA) as:
+ * new_produce_rate = a * avg_rate_in_current_window + (1 - a) * historic_produce_rate, where:
+ * "a" is the exponential decay factor. Small values of "a" result in estimates updated more slowly, while large values of
+ * "a" will result in giving more weight to recent values.
+ */
+public class KafkaProduceRateTracker {
+  private static final String KAFKA_PRODUCE_RATE_EXPONENTIAL_DECAY_FACTOR_KEY =
+      "gobblin.kafka.produceRateTracker.exponentialDecayFactor";
+  private static final Double DEFAULT_KAFKA_PRODUCE_RATE_EXPONENTIAL_DECAY_FACTOR = 0.375;
+  static final String KAFKA_PRODUCE_RATE_DISABLE_STATS_ON_HOLIDAYS_KEY = "gobblin.kafka.produceRateTracker.disableStatsOnHolidays";
+  private static final Boolean DEFAULT_KAFKA_PRODUCE_RATE_DISABLE_STATS_ON_HOLIDAYS = false;
+  private static final String KAFKA_PRODUCE_RATE_HOLIDAY_LOCALE_KEY = "gobblin.kafka.produceRateTracker.holidayLocale";
+  private static final String DEFAULT_KAFKA_PRODUCE_RATE_HOLIDAY_LOCALE = "ca";
+  private static final String HOLIDAY_FILE = "Holidays.xml";
+  private static final DateTimeZone DEFAULT_TIME_ZONE = DateTimeZone.getDefault();
+  static final int SLIDING_WINDOW_SIZE = 3;
+
+  static final String KAFKA_PARTITION_PRODUCE_RATE_KEY = "produceRate";
+
+  /**
+   * The element-insertion order has to be maintained since:
+   * 1. API provided by {@link KafkaExtractorStatsTracker} accepts index of partition, instead of partition
+   * object like {@link #partitionsToProdRate}.
+   * 2. When traversing {@link #partitionsToProdRate} to update new produce-rate value, we need find the previous value
+   * of watermark to help calculate produce-rate, which is also indexed by partition-index.
+   * 3. To make sure the entry mapping between entry in the {@link #partitionsToProdRate} and entry in {@link KafkaExtractorStatsTracker}
+   * and watermark, the order of entries in this map which is defined by input list of {@link KafkaPartition} needs
+   * to be preserved, as this input list also serves source of constructing entry-order in {@link KafkaExtractorStatsTracker}.
+   */
+  @Getter
+  private final LinkedHashMap<KafkaPartition, Double> partitionsToProdRate;
+  private final WatermarkTracker watermarkTracker;
+  private final KafkaExtractorStatsTracker statsTracker;
+  private final Double exponentialDecayFactor;
+  private final Boolean disableStatsOnHolidays;
+  private final HolidayManager holidayManager;
+  private final long flushIntervalSecs;
+  private final String holidayLocale;
+  private Long lastReportTimeMillis;
+  private final Map<LocalDate, Boolean> holidayMap = Maps.newHashMap();
+
+  private final EvictingQueue<Long> ingestionLatencies = EvictingQueue.create(SLIDING_WINDOW_SIZE);
+  private final EvictingQueue<Double> consumptionRateMBps = EvictingQueue.create(SLIDING_WINDOW_SIZE);
+
+  public KafkaProduceRateTracker(WorkUnitState state, List<KafkaPartition> partitions, WatermarkTracker watermarkTracker,
+      KafkaExtractorStatsTracker statsTracker) {
+    this(state, partitions, watermarkTracker, statsTracker, System.currentTimeMillis());
+  }
+
+  @VisibleForTesting
+  KafkaProduceRateTracker(WorkUnitState state, List<KafkaPartition> partitions, WatermarkTracker watermarkTracker,
+      KafkaExtractorStatsTracker statsTracker, Long lastReportTimeMillis) {
+    this.partitionsToProdRate = (LinkedHashMap<KafkaPartition, Double>) partitions.stream()
+        .collect(Collectors.toMap(Function.identity(), x -> new Double(-1), (e1, e2) -> e1, LinkedHashMap::new));
+    this.watermarkTracker = watermarkTracker;
+    this.statsTracker = statsTracker;
+    this.lastReportTimeMillis = lastReportTimeMillis;
+    this.exponentialDecayFactor = state.getPropAsDouble(KAFKA_PRODUCE_RATE_EXPONENTIAL_DECAY_FACTOR_KEY, DEFAULT_KAFKA_PRODUCE_RATE_EXPONENTIAL_DECAY_FACTOR);
+
+    URL calendarFileUrl = getClass().getClassLoader().getResource(HOLIDAY_FILE);
+    this.holidayManager =
+        calendarFileUrl != null ? HolidayManager.getInstance(new UrlManagerParameter(calendarFileUrl, new Properties()))
+            : HolidayManager.getInstance();
+    this.disableStatsOnHolidays = state.getPropAsBoolean(KAFKA_PRODUCE_RATE_DISABLE_STATS_ON_HOLIDAYS_KEY,
+        DEFAULT_KAFKA_PRODUCE_RATE_DISABLE_STATS_ON_HOLIDAYS);
+    this.holidayLocale = state.getProp(KAFKA_PRODUCE_RATE_HOLIDAY_LOCALE_KEY, DEFAULT_KAFKA_PRODUCE_RATE_HOLIDAY_LOCALE);
+    this.flushIntervalSecs = state.getPropAsLong(FlushingExtractor.FLUSH_INTERVAL_SECONDS_KEY, FlushingExtractor.DEFAULT_FLUSH_INTERVAL_SECONDS);
+  }
+
+  public static int getHourOfDay(Date date) {
+    Calendar calendar = Calendar.getInstance();
+    calendar.setTime(date);
+    //HOUR_OF_DAY ranges from 0-23.
+    return calendar.get(Calendar.HOUR_OF_DAY);
+  }
+
+  public static int getDayOfWeek(Date date) {
+    Calendar calendar = Calendar.getInstance();
+    calendar.setTime(date);
+    //DAY_OF_WEEK ranges from 1-7.
+    return calendar.get(Calendar.DAY_OF_WEEK) - 1;
+  }
+
+  @Getter
+  @AllArgsConstructor
+  public static class TopicPartitionStats {
+    private double[][] avgProduceRates;
+    private double avgConsumeRate;
+    private long avgRecordSize;
+    // Cache the recent caught produce-rate of a partition.
+    private double currentProduceRate;
+  }
+
+  private TopicPartitionStats getNewTopicPartitionStats(Long previousMaxOffset, Long maxOffset, KafkaStreamingExtractor.KafkaWatermark lastCommittedWatermark,
+      long currentTimeMillis, Long avgRecordSize) {
+    if (previousMaxOffset == 0) {
+      TopicPartitionStats stats = new TopicPartitionStats(lastCommittedWatermark.getAvgProduceRates(), lastCommittedWatermark.getAvgConsumeRate(),
+          lastCommittedWatermark.getAvgRecordSize(), 0);
+      return stats;
+    }
+    long numRecordsProduced = maxOffset - previousMaxOffset;
+    long newAvgRecordSize;
+    if (numRecordsProduced > 0) {
+      if (lastCommittedWatermark.getAvgRecordSize() > 0) {
+        newAvgRecordSize = updateMovingAverage(avgRecordSize, lastCommittedWatermark.getAvgRecordSize()).longValue();
+      } else {
+        //No previously recorded average record size.
+        newAvgRecordSize = avgRecordSize;
+      }
+    } else {
+      //No records see in the current window. No need to update the average record size.
+      newAvgRecordSize = lastCommittedWatermark.getAvgRecordSize();
+    }
+    Date date = new Date(currentTimeMillis);
+    int hourOfDay = getHourOfDay(date);
+    int dayOfWeek = getDayOfWeek(date);
+    double currentProduceRate =
+        (numRecordsProduced * avgRecordSize) * 1000 / (double) (currentTimeMillis - lastReportTimeMillis + 1);
+    double[][] historicProduceRates = lastCommittedWatermark.getAvgProduceRates();
+    if (!isHoliday(new LocalDate(currentTimeMillis, DEFAULT_TIME_ZONE))) {
+      if (historicProduceRates != null) {
+        if (historicProduceRates[dayOfWeek][hourOfDay] >= 0) {
+          historicProduceRates[dayOfWeek][hourOfDay] =
+              updateMovingAverage(currentProduceRate, historicProduceRates[dayOfWeek][hourOfDay]);
+        } else {
+          historicProduceRates[dayOfWeek][hourOfDay] = currentProduceRate;
+        }
+      } else {
+        //No previous values found. Bootstrap with the average rate computed in the current window.
+        historicProduceRates = new double[7][24];
+        for (double[] row : historicProduceRates) {
+          Arrays.fill(row, -1.0);
+        }
+        historicProduceRates[dayOfWeek][hourOfDay] = currentProduceRate;
+      }
+    }
+
+    double consumeRate = lastCommittedWatermark.getAvgConsumeRate();
+    ingestionLatencies.add(this.statsTracker.getMaxIngestionLatency(TimeUnit.SECONDS));
+    consumptionRateMBps.add(this.statsTracker.getConsumptionRateMBps());
+
+    if (isConsumerBacklogged()) {
+      //If ingestion latency is high, it means the consumer is backlogged. Hence, its current consumption rate
+      //must equal the peak consumption rate.
+      consumeRate = consumeRate >= 0 ? updateMovingAverage(getPenultimateElement(consumptionRateMBps), consumeRate)
+          : this.statsTracker.getConsumptionRateMBps();
+    }
+    return new TopicPartitionStats(historicProduceRates, consumeRate, newAvgRecordSize, currentProduceRate);
+  }
+
+  private boolean isConsumerBacklogged() {
+    if (this.ingestionLatencies.size() < SLIDING_WINDOW_SIZE) {
+      return false;
+    }
+    for (long latency: this.ingestionLatencies) {
+      if (latency < (2 * this.flushIntervalSecs)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Returns the element before the final element. It does this by removing the oldest element and peeking at the
+   * next element.
+   * @param queue
+   * @return
+   */
+  static Double getPenultimateElement(EvictingQueue<Double> queue) {
+    Preconditions.checkArgument(queue.size() > 1);
+    queue.remove();
+    return queue.peek();
+  }
+
+  /**
+   * A method that computes a new moving average from previous average estimate and current value using an
+   * Exponentially weighted moving average (EWMA) algorithm.
+   * @param currentValue
+   * @param previousAverage
+   * @return updated moving average computed as an EWMA.
+   */
+  private Double updateMovingAverage(double currentValue, double previousAverage) {
+    return exponentialDecayFactor * currentValue + (1 - exponentialDecayFactor) * previousAverage;
+  }
+
+  /**
+   * Several side effects in this method:
+   * 1. Write ProduceRate of each KafkaPartition into its watermark as the method name indicates.
+   * 2. Update {@link #partitionsToProdRate} for each KafkaPartitions with their newest ProduceRate, this would be
+   * part of GTE to be emitted as each flush happens.
+   */
+  public void writeProduceRateToKafkaWatermarks(Map<KafkaPartition, Long> latestOffsetMap, Map<String, CheckpointableWatermark> lastCommittedWatermarks,
+      MultiLongWatermark highWatermark, long currentTimeMillis) {
+    int partitionIndex = 0;
+    Map<String, CheckpointableWatermark> unacknowledgedWatermarks = watermarkTracker.getAllUnacknowledgedWatermarks();
+    for (KafkaPartition partition : this.partitionsToProdRate.keySet()) {
+      long maxOffset = latestOffsetMap.getOrDefault(partition, -1L);
+
+      KafkaStreamingExtractor.KafkaWatermark kafkaWatermark =
+          (KafkaStreamingExtractor.KafkaWatermark) lastCommittedWatermarks.get(partition.toString());
+      KafkaStreamingExtractor.KafkaWatermark unacknowledgedWatermark =
+          (KafkaStreamingExtractor.KafkaWatermark) unacknowledgedWatermarks.get(partition.toString());
+      if (unacknowledgedWatermark == null && kafkaWatermark == null) {
+        //If there is no previously committed watermark for the topic partition and no records in current time window, no further
+        //processing needed for the topic partition.
+        continue;
+      }
+      if (kafkaWatermark == null) {
+        //If there is no previously committed watermark for the topic partition, create a dummy watermark for computing stats
+        kafkaWatermark = new KafkaStreamingExtractor.KafkaWatermark(partition, new LongWatermark(0L));
+      }
+      long avgRecordSize = this.statsTracker.getAvgRecordSize(partitionIndex);
+      long previousMaxOffset = highWatermark.get(partitionIndex++);
+      //If maxOffset < 0, it means that we could not get max offsets from Kafka due to metadata fetch failure.
+      // In this case, carry previous state forward and set produce-rate to negative value, indicating it's not available.
+      TopicPartitionStats stats =
+          maxOffset >= 0 ? getNewTopicPartitionStats(previousMaxOffset, maxOffset, kafkaWatermark, currentTimeMillis,
+              avgRecordSize)
+              : new TopicPartitionStats(kafkaWatermark.getAvgProduceRates(), kafkaWatermark.getAvgConsumeRate(), kafkaWatermark.getAvgRecordSize(), -1);
+
+      if (unacknowledgedWatermark == null) {
+        //If no record seen for this topicPartition in the current time window; carry forward the previously committed
+        // watermark with the updated statistics
+        unacknowledgedWatermark = kafkaWatermark;
+        watermarkTracker.unacknowledgedWatermark(unacknowledgedWatermark);
+      }
+      unacknowledgedWatermark.setAvgProduceRates(stats.getAvgProduceRates());
+      unacknowledgedWatermark.setAvgConsumeRate(stats.getAvgConsumeRate());
+      unacknowledgedWatermark.setAvgRecordSize(stats.getAvgRecordSize());
+      partitionsToProdRate.put(partition, stats.getCurrentProduceRate());
+    }
+    this.lastReportTimeMillis = currentTimeMillis;
+  }
+
+  /**
+   * @param date
+   * @return true if:
+   *  <ul>
+   *    <li>Stats collection on holidays is enabled</li>, or
+   *    <li>{@param date} is a holiday for the given locale</li>
+   *  </ul>
+   */
+  boolean isHoliday(LocalDate date) {
+    if (!this.disableStatsOnHolidays) {
+      return false;
+    }
+    if (holidayMap.containsKey(date)) {
+      return holidayMap.get(date);
+    } else {
+      boolean isHolidayToday = this.holidayManager.isHoliday(date, this.holidayLocale);
+      holidayMap.put(date, isHolidayToday);
+      return isHolidayToday;
+    }
+  }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
new file mode 100644
index 0000000..8412592
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.avro.Schema;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.AtomicDouble;
+import com.google.gson.JsonElement;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
+import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import org.apache.gobblin.source.extractor.CheckpointableWatermark;
+import org.apache.gobblin.source.extractor.ComparableWatermark;
+import org.apache.gobblin.source.extractor.Watermark;
+import org.apache.gobblin.source.extractor.WatermarkSerializerHelper;
+import org.apache.gobblin.source.extractor.extract.FlushingExtractor;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.stream.FlushRecordEnvelope;
+import org.apache.gobblin.stream.RecordEnvelope;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ClustersNames;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+import static org.apache.gobblin.source.extractor.extract.kafka.KafkaProduceRateTracker.KAFKA_PARTITION_PRODUCE_RATE_KEY;
+import static org.apache.gobblin.source.extractor.extract.kafka.KafkaSource.DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS;
+import static org.apache.gobblin.source.extractor.extract.kafka.KafkaSource.GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS;
+import static org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaTopicGroupingWorkUnitPacker.NUM_PARTITIONS_ASSIGNED;
+
+/**
+ * An implementation of {@link org.apache.gobblin.source.extractor.Extractor}  which reads from Kafka and returns records .
+ * Type of record depends on deserializer set.
+ */
+@Slf4j
+public class KafkaStreamingExtractor<S> extends FlushingExtractor<S, DecodeableKafkaRecord> {
+  public static final String DATASET_KEY = "dataset";
+  public static final String DATASET_PARTITION_KEY = "datasetPartition";
+  private static final Long MAX_LOG_DECODING_ERRORS = 5L;
+  private static final String KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES_KEY =
+      "gobblin.kafka.extractor.statsReportingIntervalMinutes";
+  private static final Long DEFAULT_KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES = 1L;
+
+  private final ClassAliasResolver<GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory>
+      kafkaConsumerClientResolver;
+  private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+  private final Map<String, AtomicDouble> consumerMetricsGauges = new ConcurrentHashMap<>();
+  private final KafkaExtractorStatsTracker statsTracker;
+  private final KafkaProduceRateTracker produceRateTracker;
+  private final List<KafkaPartition> partitions;
+  private final long extractorStatsReportingTimeIntervalMillis;
+  //Mapping from Kafka Partition Id to partition index
+  @Getter
+  private final Map<Integer, Integer> partitionIdToIndexMap;
+  private final String recordCreationTimestampFieldName;
+  private final TimeUnit recordCreationTimestampUnit;
+
+  private Iterator<KafkaConsumerRecord> messageIterator = null;
+  private long readStartTime;
+  private long lastExtractorStatsReportingTime;
+  private Map<KafkaPartition, Long> latestOffsetMap = Maps.newHashMap();
+
+  protected MultiLongWatermark lowWatermark;
+  protected MultiLongWatermark highWatermark;
+  protected MultiLongWatermark nextWatermark;
+  protected Map<Integer, DecodeableKafkaRecord> perPartitionLastSuccessfulRecord;
+  private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
+
+  @Override
+  public void shutdown() {
+    this.shutdownRequested.set(true);
+  }
+
+  @ToString
+  public static class KafkaWatermark implements CheckpointableWatermark {
+    @Getter
+    KafkaPartition topicPartition;
+    LongWatermark _lwm;
+    //Average TopicPartition Produce Rate by hour-of-day and day-of-week in records/sec.
+    @Getter
+    @Setter
+    double[][] avgProduceRates;
+    //Average consume rate for the topic when backlogged.
+    @Getter
+    @Setter
+    double avgConsumeRate = -1.0;
+    @Getter
+    @Setter
+    long avgRecordSize;
+
+    @VisibleForTesting
+    public KafkaWatermark(KafkaPartition topicPartition, LongWatermark lwm) {
+      this.topicPartition = topicPartition;
+      _lwm = lwm;
+    }
+
+    @Override
+    public String getSource() {
+      return topicPartition.toString();
+    }
+
+    @Override
+    public ComparableWatermark getWatermark() {
+      return _lwm;
+    }
+
+    @Override
+    public short calculatePercentCompletion(Watermark lowWatermark, Watermark highWatermark) {
+      return 0;
+    }
+
+    @Override
+    public JsonElement toJson() {
+      return WatermarkSerializerHelper.convertWatermarkToJson(this);
+    }
+
+    @Override
+    public int compareTo(CheckpointableWatermark o) {
+      Preconditions.checkArgument(o instanceof KafkaWatermark);
+      KafkaWatermark ko = (KafkaWatermark) o;
+      Preconditions.checkArgument(topicPartition.equals(ko.topicPartition));
+      return _lwm.compareTo(ko._lwm);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null) {
+        return false;
+      }
+      if (!(obj instanceof KafkaWatermark)) {
+        return false;
+      }
+      return this.compareTo((CheckpointableWatermark) obj) == 0;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      return topicPartition.hashCode() * prime + _lwm.hashCode();
+    }
+
+    public LongWatermark getLwm() {
+      return _lwm;
+    }
+  }
+
+  AtomicLong _rowCount = new AtomicLong(0);
+  protected final Optional<KafkaSchemaRegistry<String, S>> _schemaRegistry;
+  protected final GobblinKafkaConsumerClient kafkaConsumerClient;
+
+  private final List<KafkaPartition> topicPartitions; // list of topic partitions assigned to this extractor
+
+  public KafkaStreamingExtractor(WorkUnitState state) {
+    super(state);
+    this.kafkaConsumerClientResolver =
+        new ClassAliasResolver<>(GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory.class);
+    try {
+      this.kafkaConsumerClient = this.closer.register(
+          this.kafkaConsumerClientResolver.resolveClass(state.getProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS,
+              DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS))
+              .newInstance()
+              .create(ConfigUtils.propertiesToConfig(state.getProperties())));
+    } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+
+    this._schemaRegistry = state.contains(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS) ? Optional.of(
+        KafkaSchemaRegistry.<String, S>get(state.getProperties())) : Optional.<KafkaSchemaRegistry<String, S>>absent();
+
+    this.topicPartitions = getTopicPartitionsFromWorkUnit(state);
+    this.kafkaConsumerClient.assignAndSeek(topicPartitions, getTopicPartitionWatermarks(this.topicPartitions));
+    this.messageIterator = this.kafkaConsumerClient.consume();
+
+    this.partitions = KafkaUtils.getPartitions(state);
+    this.partitionIdToIndexMap = Maps.newHashMapWithExpectedSize(this.partitions.size());
+    try {
+      this.latestOffsetMap = this.kafkaConsumerClient.getLatestOffsets(this.partitions);
+    } catch (KafkaOffsetRetrievalFailureException e) {
+      e.printStackTrace();
+    }
+    this.statsTracker = new KafkaExtractorStatsTracker(state, partitions);
+    this.produceRateTracker = new KafkaProduceRateTracker(state, partitions, getWatermarkTracker(), statsTracker);
+    this.extractorStatsReportingTimeIntervalMillis =
+        state.getPropAsLong(KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES_KEY,
+            DEFAULT_KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES) * 60 * 1000;
+    resetExtractorStatsAndWatermarks(true);
+
+    //Schedule a thread for reporting Kafka consumer metrics
+    this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+      Map<String, Metric> codahaleMetricMap = kafkaConsumerClient.getMetrics();
+      for (Map.Entry<String, Metric> metricEntry : codahaleMetricMap.entrySet()) {
+        if (log.isDebugEnabled()) {
+          log.debug("Metric name: {}, Value: {}", metricEntry.getKey(),
+              ((Gauge<Double>) metricEntry.getValue()).getValue());
+        }
+        consumerMetricsGauges.computeIfAbsent(metricEntry.getKey(), k -> {
+          AtomicDouble d = new AtomicDouble();
+          ContextAwareGauge<Double> consumerMetricGauge =
+              getMetricContext().newContextAwareGauge(metricEntry.getKey(), () -> d.get());
+          getMetricContext().register(metricEntry.getKey(), consumerMetricGauge);
+          return d;
+        }).set(((Gauge<Double>) metricEntry.getValue()).getValue());
+      }
+    }, 0, 60, TimeUnit.SECONDS);
+
+    this.recordCreationTimestampFieldName =
+        this.workUnitState.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD, null);
+    this.recordCreationTimestampUnit = TimeUnit.valueOf(
+        this.workUnitState.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, TimeUnit.MILLISECONDS.name()));
+  }
+
+  private Map<KafkaPartition, LongWatermark> getTopicPartitionWatermarks(List<KafkaPartition> topicPartitions) {
+    List<String> topicPartitionStrings =
+        topicPartitions.stream().map(topicPartition -> topicPartition.toString()).collect(Collectors.toList());
+    // read watermarks from storage
+    Map<String, CheckpointableWatermark> kafkaWatermarkMap =
+        super.getCommittedWatermarks(KafkaWatermark.class, topicPartitionStrings);
+
+    Map<KafkaPartition, LongWatermark> longWatermarkMap = new HashMap<>();
+    for (KafkaPartition topicPartition : topicPartitions) {
+      String topicPartitionString = topicPartition.toString();
+      if (kafkaWatermarkMap.containsKey(topicPartitionString)) {
+        LongWatermark longWatermark = ((KafkaWatermark) kafkaWatermarkMap.get(topicPartitionString)).getLwm();
+        longWatermarkMap.put(topicPartition, longWatermark);
+      } else {
+        longWatermarkMap.put(topicPartition, new LongWatermark(0L));
+      }
+    }
+    return longWatermarkMap;
+  }
+
+  private List<KafkaPartition> getTopicPartitionsFromWorkUnit(WorkUnitState state) {
+    // what topic partitions are we responsible for?
+    List<KafkaPartition> topicPartitions = new ArrayList<>();
+
+    WorkUnit workUnit = state.getWorkunit();
+    String topicNameProp = KafkaSource.TOPIC_NAME;
+    int numOfPartitions =
+        workUnit.contains(NUM_PARTITIONS_ASSIGNED) ? Integer.parseInt(workUnit.getProp(NUM_PARTITIONS_ASSIGNED)) : 0;
+
+    for (int i = 0; i < numOfPartitions; ++i) {
+      if (workUnit.getProp(topicNameProp, null) == null) {
+        log.warn("There's no topic.name property being set in workunt which could be an illegal state");
+        break;
+      }
+      String topicName = workUnit.getProp(topicNameProp);
+
+      String partitionIdProp = KafkaSource.PARTITION_ID + "." + i;
+      int partitionId = workUnit.getPropAsInt(partitionIdProp);
+      KafkaPartition topicPartition = new KafkaPartition.Builder().withTopicName(topicName).withId(partitionId).build();
+      topicPartitions.add(topicPartition);
+    }
+    return topicPartitions;
+  }
+
+  /**
+   * Get the schema (metadata) of the extracted data records.
+   *
+   * @return the schema of Kafka topic being extracted
+   */
+  @Override
+  public S getSchema() {
+    try {
+      Schema schema = (Schema) this._schemaRegistry.get().getLatestSchemaByTopic(this.topicPartitions.get(0).getTopicName());
+      return (S) schema;
+    } catch (SchemaRegistryException e) {
+      e.printStackTrace();
+    }
+    return null;
+  }
+
+  @Override
+  public List<Tag<?>> generateTags(State state) {
+    List<Tag<?>> tags = super.generateTags(state);
+    String clusterIdentifier = ClustersNames.getInstance().getClusterName();
+    tags.add(new Tag<>("clusterIdentifier", clusterIdentifier));
+    return tags;
+  }
+
+  /**
+   * Return the next record. Return null if we're shutdown.
+   */
+  @SuppressWarnings("unchecked")
+  @Override
+  public RecordEnvelope<DecodeableKafkaRecord> readRecordEnvelopeImpl() throws IOException {
+    if (this.shutdownRequested.get()) {
+      return null;
+    }
+    this.readStartTime = System.nanoTime();
+    long fetchStartTime = System.nanoTime();
+    try {
+      while (this.messageIterator == null || !this.messageIterator.hasNext()) {
+        Long currentTime = System.currentTimeMillis();
+        //it's time to flush, so break the while loop and directly return null
+        if ((currentTime - timeOfLastFlush) > this.flushIntervalMillis) {
+          return new FlushRecordEnvelope();
+        }
+        try {
+          fetchStartTime = System.nanoTime();
+          this.messageIterator = this.kafkaConsumerClient.consume();
+        } catch (Exception e) {
+          log.error("Failed to consume from Kafka", e);
+        }
+      }
+      DecodeableKafkaRecord kafkaConsumerRecord = (DecodeableKafkaRecord) this.messageIterator.next();
+
+      int partitionIndex = this.partitionIdToIndexMap.get(kafkaConsumerRecord.getPartition());
+      this.statsTracker.onFetchNextMessageBuffer(partitionIndex, fetchStartTime);
+
+      // track time for converting KafkaConsumerRecord to a RecordEnvelope
+      long decodeStartTime = System.nanoTime();
+      KafkaPartition topicPartition =
+          new KafkaPartition.Builder().withTopicName(kafkaConsumerRecord.getTopic()).withId(kafkaConsumerRecord.getPartition()).build();
+      RecordEnvelope<DecodeableKafkaRecord> recordEnvelope = new RecordEnvelope(kafkaConsumerRecord,
+          new KafkaWatermark(topicPartition, new LongWatermark(kafkaConsumerRecord.getOffset())));
+      recordEnvelope.setRecordMetadata("topicPartition", topicPartition);
+      recordEnvelope.setRecordMetadata(DATASET_KEY, topicPartition.getTopicName());
+      recordEnvelope.setRecordMetadata(DATASET_PARTITION_KEY, "" + topicPartition.getId());
+      this.statsTracker.onDecodeableRecord(partitionIndex, readStartTime, decodeStartTime,
+          kafkaConsumerRecord.getValueSizeInBytes(),
+          kafkaConsumerRecord.isTimestampLogAppend() ? kafkaConsumerRecord.getTimestamp() : 0L,
+          (this.recordCreationTimestampFieldName != null) ? kafkaConsumerRecord.getRecordCreationTimestamp(
+              this.recordCreationTimestampFieldName, this.recordCreationTimestampUnit) : 0L);
+      this.perPartitionLastSuccessfulRecord.put(partitionIndex, kafkaConsumerRecord);
+      this.nextWatermark.set(partitionIndex, kafkaConsumerRecord.getNextOffset());
+      return recordEnvelope;
+    } catch (Throwable t) {
+      this.statsTracker.onUndecodeableRecord(0);
+      if (shouldLogError()) {
+        log.error("Error when decoding a Kafka consumer record");
+      }
+      throw new IOException("Error in extraction", t);
+    }
+  }
+
+  private boolean shouldLogError() {
+    return this.statsTracker.getUndecodableMessageCount() <= MAX_LOG_DECODING_ERRORS;
+  }
+
+  @Override
+  protected void onFlushAck() throws IOException {
+    try {
+      //Refresh the latest offsets of TopicPartitions processed by the KafkaExtractor.
+      this.latestOffsetMap = this.kafkaConsumerClient.getLatestOffsets(this.partitions);
+    } catch (KafkaOffsetRetrievalFailureException e) {
+      log.error("Unable to retrieve latest offsets due to {}", e);
+    }
+    long currentTime = System.currentTimeMillis();
+    //Update the watermarks to include the current topic partition produce rates
+    this.produceRateTracker.writeProduceRateToKafkaWatermarks(this.latestOffsetMap, getLastCommittedWatermarks(),
+        this.highWatermark, currentTime);
+
+    // Assemble additional tags to be part of GTE, for now only Partition-ProduceRate.
+    Map<KafkaPartition, Map<String, String>> additionalTags = getAdditionalTagsHelper();
+
+    //Commit offsets to the watermark storage.
+    super.onFlushAck();
+
+    //Emit GobblinTrackingEvent with current extractor stats and reset them before the next epoch starts.
+    if (this.isInstrumentationEnabled()) {
+      if (currentTime - this.lastExtractorStatsReportingTime > this.extractorStatsReportingTimeIntervalMillis) {
+        for (int partitionIndex = 0; partitionIndex < this.partitions.size(); partitionIndex++) {
+          this.statsTracker.updateStatisticsForCurrentPartition(partitionIndex, readStartTime,
+              getLastSuccessfulRecordHeaderTimestamp(partitionIndex));
+        }
+        Map<KafkaPartition, Map<String, String>> tagsForPartitions =
+            this.statsTracker.generateTagsForPartitions(lowWatermark, highWatermark, nextWatermark, additionalTags);
+        this.statsTracker.emitTrackingEvents(getMetricContext(), tagsForPartitions);
+        this.resetExtractorStatsAndWatermarks(false);
+        this.lastExtractorStatsReportingTime = currentTime;
+      }
+    }
+  }
+
+  @Override
+  public CommitStep initCommitStep(String commitStepAlias, boolean isPrecommit) throws IOException {
+    try {
+      log.info("Instantiating {}", commitStepAlias);
+      return (CommitStep) GobblinConstructorUtils.invokeLongestConstructor(
+          new ClassAliasResolver(CommitStep.class).resolveClass(commitStepAlias), config, statsTracker);
+    } catch (ReflectiveOperationException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * A helper function to transform a Map<KafkaPartition, Double> to Map<KafkaPartition, Map<String, String>>.
+   * If hard to read: Using Collectors.toMap method to construct inline-initialized Map.
+   */
+  Map<KafkaPartition, Map<String, String>> getAdditionalTagsHelper() {
+    return produceRateTracker.getPartitionsToProdRate()
+        .entrySet()
+        .stream()
+        .collect(Collectors.toMap(Map.Entry::getKey,
+            value -> Stream.of(new AbstractMap.SimpleEntry<>(KAFKA_PARTITION_PRODUCE_RATE_KEY, value.toString()))
+                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))));
+  }
+
+  @VisibleForTesting
+  public void resetExtractorStatsAndWatermarks(boolean isInit) {
+    if (isInit) {
+      //Initialize nextwatermark, highwatermark and lowwatermarks for Extractor stats reporting.
+      this.perPartitionLastSuccessfulRecord = Maps.newHashMapWithExpectedSize(this.partitions.size());
+      this.lastExtractorStatsReportingTime = System.currentTimeMillis();
+      this.lowWatermark =
+          new MultiLongWatermark(this.partitions.stream().map(partition -> 0L).collect(Collectors.toList()));
+      this.highWatermark =
+          new MultiLongWatermark(this.partitions.stream().map(partition -> 0L).collect(Collectors.toList()));
+    }
+
+    this.workUnitState.removeProp(KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME);
+    this.workUnitState.removeProp(KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME);
+    this.workUnitState.removeProp(KafkaSource.PREVIOUS_LOW_WATERMARK);
+    this.workUnitState.removeProp(KafkaSource.PREVIOUS_HIGH_WATERMARK);
+    this.workUnitState.removeProp(KafkaSource.PREVIOUS_LATEST_OFFSET);
+
+    int partitionIndex = 0;
+    for (KafkaPartition partition : partitions) {
+      if (isInit) {
+        this.partitionIdToIndexMap.put(partition.getId(), partitionIndex);
+      }
+
+      this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_HIGH_WATERMARK, partitionIndex),
+          this.highWatermark.get(partitionIndex));
+      this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_LOW_WATERMARK, partitionIndex),
+          this.lowWatermark.get(partitionIndex));
+      this.workUnitState.setProp(
+          KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME, partitionIndex),
+          this.statsTracker.getStatsMap().get(partitions.get(partitionIndex)).getStartFetchEpochTime());
+      this.workUnitState.setProp(
+          KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME, partitionIndex),
+          this.statsTracker.getStatsMap().get(partitions.get(partitionIndex)).getStopFetchEpochTime());
+      this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_LATEST_OFFSET, partitionIndex),
+          this.highWatermark.get(partitionIndex));
+
+      KafkaWatermark kafkaWatermark = (KafkaWatermark) this.lastCommittedWatermarks.get(partition.toString());
+      long lowWatermarkValue = 0L;
+      if (kafkaWatermark != null) {
+        lowWatermarkValue = kafkaWatermark.getLwm().getValue() + 1;
+      }
+      this.lowWatermark.set(partitionIndex, lowWatermarkValue);
+      if (latestOffsetMap.containsKey(partition)) {
+        this.highWatermark.set(partitionIndex, latestOffsetMap.get(partition));
+      }
+      partitionIndex++;
+    }
+    this.nextWatermark = new MultiLongWatermark(this.lowWatermark);
+
+    // Add error partition count and error message count to workUnitState
+    this.workUnitState.setProp(ConfigurationKeys.ERROR_PARTITION_COUNT, this.statsTracker.getErrorPartitionCount());
+    this.workUnitState.setProp(ConfigurationKeys.ERROR_MESSAGE_UNDECODABLE_COUNT,
+        this.statsTracker.getUndecodableMessageCount());
+    this.workUnitState.setActualHighWatermark(this.nextWatermark);
+
+    //Reset stats tracker
+    this.statsTracker.reset();
+  }
+
+  protected long getLastSuccessfulRecordHeaderTimestamp(int partitionId) {
+    return 0;
+  }
+
+  /**
+   * Call back that asks the extractor to remove work from its plate
+   * @param workUnitState
+   */
+  public boolean onWorkUnitRemove(WorkUnitState workUnitState) {
+    // TODO: check if these topic partitions actually were part of the assignment
+    // add to queue of flush control messages
+    // set up ack on them
+    return false;
+  }
+
+  public boolean onWorkUnitAdd(WorkUnitState workUnitState) {
+    List<KafkaPartition> newTopicPartitions = getTopicPartitionsFromWorkUnit(workUnitState);
+    // get watermarks for these topic partitions
+    Map<KafkaPartition, LongWatermark> topicWatermarksMap = getTopicPartitionWatermarks(newTopicPartitions);
+    this.topicPartitions.addAll(newTopicPartitions);
+    this.kafkaConsumerClient.assignAndSeek(topicPartitions, topicWatermarksMap);
+    return true;
+  }
+
+  @Override
+  public long getExpectedRecordCount() {
+    return _rowCount.get();
+  }
+
+  @Override
+  public void close() throws IOException {
+    this.closer.close();
+  }
+
+  @Deprecated
+  @Override
+  public long getHighWatermark() {
+    return 0;
+  }
+
+  @Override
+  public String toString() {
+    return topicPartitions.toString();
+  }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
new file mode 100644
index 0000000..aeab8e0
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.CountEventBuilder;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.CheckpointableWatermarkState;
+import org.apache.gobblin.runtime.StateStoreBasedWatermarkStorage;
+import org.apache.gobblin.source.extractor.extract.AbstractSource;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.io.GsonInterfaceAdapter;
+
+import static org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaBiLevelWorkUnitPacker.bestFitDecreasingBinPacking;
+
+/**
+ *
+ * An implementation of {@link KafkaWorkUnitPacker} that used for streamlined Kafka ingestion, which:
+ *
+ * 1) Groups partitions of the same topic together. Multiple topics are never mixed in a base {@link WorkUnit},
+ * but may be mixed in a {@link MultiWorkUnit}
+ * 2) Don't assign offset range within WorkUnit but provides a list of partitions (or topics) to inform streaming
+ * {@link org.apache.gobblin.source.extractor.Extractor} of where to pull events from, behaves as an "index' for
+ * {@link org.apache.gobblin.source.extractor.Extractor}.
+ *
+ * It is then {@link org.apache.gobblin.source.extractor.Extractor}'s responsibility to interact with
+ * {@link org.apache.gobblin.writer.WatermarkStorage} on determining offset of each
+ * {@link org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition} that it was assigned.
+ */
+@Slf4j
+public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
+  private static final int DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER = 10;
+  private static final String TOPIC_PARTITION_DELIMITER = "-";
+
+  //A global configuration for container capacity. The container capacity refers to the peak rate (in MB/s) that a
+  //single JVM can consume from Kafka for a single topic and controls the number of partitions of a topic that will be
+  // packed into a single workunit. For example, if the container capacity is set to 10, and each topic partition has a
+  // weight of 1, then 10 partitions of the topic will be packed into a single workunit. This configuration is topic-independent
+  // i.e. all topics will be assumed to have the same peak consumption rate when set.
+  public static final String CONTAINER_CAPACITY_KEY = "gobblin.kafka.streaming.containerCapacity";
+  public static final double DEFAULT_CONTAINER_CAPACITY = 10;
+
+  //A boolean flag to enable per-topic container capacity, where "container capacity" is as defined earlier. This
+  // configuration is useful in scenarios where the write performance can vary significantly across topics due to differences
+  // in schema, as in the case of columnar formats such as ORC and Parquet. When enabled, the bin packing algorithm uses
+  // historic consumption rates for a given topic as tracked by the ingestion pipeline.
+
+  public static final String IS_PER_TOPIC_CONTAINER_CAPACITY_ENABLED_KEY = "gobblin.kafka.streaming.isPerTopicBinCapacityEnabled";
+  public static final Boolean DEFAULT_IS_PER_TOPIC_CONTAINER_CAPACITY_ENABLED = false;
+
+  //A topic-specific config that controls the minimum number of containers for that topic.
+  public static final String MIN_CONTAINERS_FOR_TOPIC = "gobblin.kafka.minContainersForTopic";
+  public static final String PARTITION_WATERMARK = "gobblin.kafka.partition.watermark";
+  public static final String PACKING_START_TIME_MILLIS = "gobblin.kafka.packer.packingStartTimeMillis";
+  public static final String IS_STATS_BASED_PACKING_ENABLED_KEY = "gobblin.kafka.streaming.isStatsBasedPackingEnabled";
+  public static final boolean DEFAULT_IS_STATS_BASED_PACKING_ENABLED = false;
+  public static final String CONTAINER_CAPACITY_COMPUTATION_STRATEGY_KEY = "gobblin.kafka.streaming.containerCapacityComputationStrategy";
+  public static final String DEFAULT_CONTAINER_CAPACITY_COMPUTATION_STRATEGY = ContainerCapacityComputationStrategy.MEDIAN.name();
+
+  public enum ContainerCapacityComputationStrategy {
+    MIN, MAX, MEAN, MEDIAN
+  }
+
+  private static final Gson GSON = GsonInterfaceAdapter.getGson(Object.class);
+
+  /**
+   * Configuration to enable indexing on packing.
+   */
+  private static final String INDEXING_ENABLED = "gobblin.kafka.streaming.enableIndexing";
+  private static final boolean DEFAULT_INDEXING_ENABLED = true;
+
+  private static final String METRICS_PREFIX = "binpacker.metrics.";
+
+  /**
+   * When indexing-packing is enabled, the number of partitions is important for extractor to know
+   * how many kafka partitions need to be pulled from.
+   *
+   * Set to public-static to share with Extractor.
+   * TODO: Shall be changed to package-private
+   */
+  public static final String NUM_PARTITIONS_ASSIGNED = "gobblin.kafka.streaming.numPartitions";
+  //A derived metric that defines the default workunit size, in case of workunit size cannot be estimated.
+  public static final String DEFAULT_WORKUNIT_SIZE_KEY = "gobblin.kafka.defaultWorkUnitSize";
+  //A lower bound for the workunit size.
+  public static final String MIN_WORKUNIT_SIZE_KEY = "gobblin.kafka.minWorkUnitSize";
+
+  private static final String NUM_CONTAINERS_EVENT_NAME = "NumContainers";
+
+  private final long packingStartTimeMillis;
+  private final Optional<StateStoreBasedWatermarkStorage> watermarkStorage;
+  private final Optional<MetricContext> metricContext;
+  private final boolean isStatsBasedPackingEnabled;
+  private final Boolean isPerTopicContainerCapacityEnabled;
+  private final ContainerCapacityComputationStrategy containerCapacityComputationStrategy;
+  private final Map<String, KafkaStreamingExtractor.KafkaWatermark> lastCommittedWatermarks = Maps.newHashMap();
+  private final Map<String, List<Double>> capacitiesByTopic = Maps.newHashMap();
+  private final EventSubmitter eventSubmitter;
+  private SourceState state;
+
+  public KafkaTopicGroupingWorkUnitPacker(AbstractSource<?, ?> source, SourceState state,
+      Optional<MetricContext> metricContext) {
+    super(source, state);
+    this.state = state;
+    this.isStatsBasedPackingEnabled =
+        state.getPropAsBoolean(IS_STATS_BASED_PACKING_ENABLED_KEY, DEFAULT_IS_STATS_BASED_PACKING_ENABLED);
+    this.isPerTopicContainerCapacityEnabled = state
+        .getPropAsBoolean(IS_PER_TOPIC_CONTAINER_CAPACITY_ENABLED_KEY, DEFAULT_IS_PER_TOPIC_CONTAINER_CAPACITY_ENABLED);
+    this.containerCapacityComputationStrategy = ContainerCapacityComputationStrategy.valueOf(
+        state.getProp(CONTAINER_CAPACITY_COMPUTATION_STRATEGY_KEY, DEFAULT_CONTAINER_CAPACITY_COMPUTATION_STRATEGY)
+            .toUpperCase());
+    this.watermarkStorage =
+        Optional.fromNullable(this.isStatsBasedPackingEnabled ? new StateStoreBasedWatermarkStorage(state) : null);
+    this.packingStartTimeMillis = System.currentTimeMillis();
+    this.metricContext = metricContext;
+    this.eventSubmitter =
+        new EventSubmitter.Builder(this.metricContext, KafkaTopicGroupingWorkUnitPacker.class.getName()).build();
+  }
+
+  /**
+   * Pack using the following strategy.
+   * - Each container has a configured capacity in terms of the cost metric.
+   *   This is configured by {@value CONTAINER_CAPACITY_KEY}.
+   * - For each topic pack the workunits into a set of topic specific buckets by filling the fullest bucket that can hold
+   *   the workunit without exceeding the container capacity.
+   * - The topic specific multi-workunits are squeezed and returned as a workunit.
+   */
+  @Override
+  public List<WorkUnit> pack(Map<String, List<WorkUnit>> workUnitsByTopic, int numContainers) {
+    double containerCapacity = this.state.getPropAsDouble(CONTAINER_CAPACITY_KEY, DEFAULT_CONTAINER_CAPACITY);
+
+    if (this.watermarkStorage.isPresent()) {
+      try {
+        addStatsToWorkUnits(workUnitsByTopic);
+      } catch (IOException e) {
+        log.error("Unable to get stats from watermark storage.");
+        throw new RuntimeException(e);
+      }
+    }
+
+    setWorkUnitEstSizes(workUnitsByTopic);
+
+    List<MultiWorkUnit> mwuGroups = Lists.newArrayList();
+
+    for (Map.Entry<String, List<WorkUnit>> entry : workUnitsByTopic.entrySet()) {
+      String topic = entry.getKey();
+      List<WorkUnit> workUnitsForTopic = entry.getValue();
+      if (this.isStatsBasedPackingEnabled && this.isPerTopicContainerCapacityEnabled) {
+        containerCapacity = getContainerCapacityForTopic(capacitiesByTopic.get(topic), this.containerCapacityComputationStrategy);
+        log.info("Container capacity for topic {}: {}", topic, containerCapacity);
+      }
+      double estimatedDataSizeForTopic = calcTotalEstSizeForTopic(workUnitsForTopic);
+      int previousSize = mwuGroups.size();
+      if (estimatedDataSizeForTopic < containerCapacity) {
+        // If the total estimated size of a topic is then the container capacity then put all partitions of this
+        // topic in a single group.
+        MultiWorkUnit mwuGroup = MultiWorkUnit.createEmpty();
+        addWorkUnitsToMultiWorkUnit(workUnitsForTopic, mwuGroup);
+        mwuGroups.add(mwuGroup);
+      } else {
+        // Use best-fit-decreasing to group workunits for a topic into multiple groups.
+        mwuGroups.addAll(bestFitDecreasingBinPacking(workUnitsForTopic, containerCapacity));
+      }
+      int numContainersForTopic = mwuGroups.size() - previousSize;
+      log.info("Packed partitions for topic {} into {} containers", topic, Integer.toString(numContainersForTopic));
+      if (this.metricContext.isPresent()) {
+        //Report the number of containers used for each topic.
+        String metricName = METRICS_PREFIX + topic + ".numContainers";
+        ContextAwareGauge<Integer> gauge =
+            this.metricContext.get().newContextAwareGauge(metricName, () -> numContainersForTopic);
+        this.metricContext.get().register(metricName, gauge);
+
+        //Submit a container count event for the given topic
+        CountEventBuilder countEventBuilder = new CountEventBuilder(NUM_CONTAINERS_EVENT_NAME, numContainersForTopic);
+        countEventBuilder.addMetadata("topic", topic);
+        this.eventSubmitter.submit(countEventBuilder);
+      }
+    }
+
+    List<WorkUnit> squeezedGroups = squeezeMultiWorkUnits(mwuGroups);
+    log.debug("Squeezed work unit groups: " + squeezedGroups);
+    return squeezedGroups;
+  }
+
+  /**
+   * TODO: This method should be moved into {@link KafkaSource}, which requires moving classes such
+   * as {@link KafkaStreamingExtractor.KafkaWatermark} to the open source. A side-effect of this method is to
+   * populate a map (called "capacitiesByTopic") of topicName to the peak consumption rate observed
+   * by a JVM for a given topic. This capacity limits the number of partitions of a topic grouped into a single workunit.
+   * The capacity is computed from the historic peak consumption rates observed by different containers processing
+   * a given topic, using the configured {@link ContainerCapacityComputationStrategy}.
+   *
+   * Read the average produce rates for each topic partition from Watermark storage and add them to the workunit.
+   * @param workUnitsByTopic
+   * @throws IOException
+   */
+  private void addStatsToWorkUnits(Map<String, List<WorkUnit>> workUnitsByTopic) throws IOException {
+    for (CheckpointableWatermarkState state : this.watermarkStorage.get().getAllCommittedWatermarks()) {
+      String topicPartition = state.getSource();
+      KafkaStreamingExtractor.KafkaWatermark watermark =
+          GSON.fromJson(state.getProp(topicPartition), KafkaStreamingExtractor.KafkaWatermark.class);
+      lastCommittedWatermarks.put(topicPartition, watermark);
+      if (this.isPerTopicContainerCapacityEnabled) {
+        String topicName = topicPartition.split(TOPIC_PARTITION_DELIMITER)[0];
+        List<Double> capacities = capacitiesByTopic.getOrDefault(topicName, Lists.newArrayList());
+        capacities.add(watermark.getAvgConsumeRate() > 0 ? watermark.getAvgConsumeRate() : DEFAULT_CONTAINER_CAPACITY);
+        capacitiesByTopic.put(topicName, capacities);
+      }
+    }
+
+    for (Map.Entry<String, List<WorkUnit>> entry : workUnitsByTopic.entrySet()) {
+      String topic = entry.getKey();
+      List<WorkUnit> workUnits = entry.getValue();
+      for (WorkUnit workUnit : workUnits) {
+        int partitionId = Integer.parseInt(workUnit.getProp(KafkaSource.PARTITION_ID));
+        String topicPartition = new KafkaPartition.Builder().withTopicName(topic).withId(partitionId).toString();
+        KafkaStreamingExtractor.KafkaWatermark watermark = lastCommittedWatermarks.get(topicPartition);
+        workUnit.setProp(PARTITION_WATERMARK, GSON.toJson(watermark));
+        workUnit.setProp(PACKING_START_TIME_MILLIS, this.packingStartTimeMillis);
+        workUnit.setProp(DEFAULT_WORKUNIT_SIZE_KEY, getDefaultWorkUnitSize());
+        workUnit.setProp(MIN_WORKUNIT_SIZE_KEY, getMinWorkUnitSize(workUnit));
+      }
+    }
+  }
+
+  private Double getDefaultWorkUnitSize() {
+    return state.getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY,
+        KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) / DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER;
+  }
+
+  /**
+   * A helper method that configures the minimum workunit size for each topic partition based on the lower bound of
+   * the number of containers to be used for the topic.
+   * @param workUnit
+   * @return the minimum workunit size.
+   */
+  private Double getMinWorkUnitSize(WorkUnit workUnit) {
+    int minContainersForTopic = workUnit.getPropAsInt(MIN_CONTAINERS_FOR_TOPIC, -1);
+    if (minContainersForTopic == -1) {
+      //No minimum configured? Return lower bound for workunit size to be 0.
+      return 0.0;
+    }
+
+    //Compute the maximum number of partitions to be packed into each container.
+    int maxNumPartitionsPerContainer = workUnit.getPropAsInt(KafkaSource.NUM_TOPIC_PARTITIONS) / minContainersForTopic;
+    return state.getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY,
+        KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) / maxNumPartitionsPerContainer;
+  }
+
+  /**
+   * Indexing all partitions that will be handled in this topic-specific MWU into a single WU by only tracking their
+   * topic/partition id.
+   * Indexed WU will not have offset assigned to pull for each partitions
+   */
+  @Override
+  protected List<WorkUnit> squeezeMultiWorkUnits(List<MultiWorkUnit> multiWorkUnits) {
+
+    if (state.getPropAsBoolean(INDEXING_ENABLED, DEFAULT_INDEXING_ENABLED)) {
+      List<WorkUnit> indexedWorkUnitList = new ArrayList<>();
+
+      // id to append to the task output directory to make it unique to avoid multiple flush publishers
+      // attempting to move the same file.
+      int uniqueId = 0;
+      for (MultiWorkUnit mwu : multiWorkUnits) {
+        // Select a sample WU.
+        WorkUnit indexedWorkUnit = mwu.getWorkUnits().get(0);
+        List<KafkaPartition> topicPartitions = getPartitionsFromMultiWorkUnit(mwu);
+
+        // Indexing all topics/partitions into this WU.
+        populateMultiPartitionWorkUnit(topicPartitions, indexedWorkUnit);
+
+        // Adding Number of Partitions as part of WorkUnit so that Extractor has clue on how many iterations to run.
+        indexedWorkUnit.setProp(NUM_PARTITIONS_ASSIGNED, topicPartitions.size());
+
+        // Need to make the task output directory unique to file move conflicts in the flush publisher.
+        String outputDir = state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR);
+        indexedWorkUnit.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(outputDir, Integer.toString(uniqueId++)));
+        indexedWorkUnitList.add(indexedWorkUnit);
+      }
+      return indexedWorkUnitList;
+    } else {
+      return super.squeezeMultiWorkUnits(multiWorkUnits);
+    }
+  }
+
+  /**
+   * A method that returns the container capacity for a given topic given the
+   * @param capacities measured container capacities derived from watermarks
+   * @param strategy the algorithm to derive the container capacity from prior measurements
+   * @return the container capacity obtained using the given {@link ContainerCapacityComputationStrategy}.
+   */
+  @VisibleForTesting
+  static double getContainerCapacityForTopic(List<Double> capacities, ContainerCapacityComputationStrategy strategy) {
+    Preconditions.checkArgument(capacities.size() > 0, "Capacities size must be > 0");
+    Collections.sort(capacities);
+    log.info("Capacity computation strategy: {}, capacities: {}", strategy.name(), capacities);
+    switch (strategy) {
+      case MIN:
+        return capacities.get(0);
+      case MAX:
+        return capacities.get(capacities.size() - 1);
+      case MEAN:
+        return (capacities.stream().mapToDouble(capacity -> capacity).sum()) / capacities.size();
+      case MEDIAN:
+        return ((capacities.size() % 2) == 0) ? (
+            (capacities.get(capacities.size() / 2) + capacities.get(capacities.size() / 2 - 1)) / 2)
+            : capacities.get(capacities.size() / 2);
+      default:
+        throw new RuntimeException("Unsupported computation strategy: " + strategy.name());
+    }
+  }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java
new file mode 100644
index 0000000..6113a79
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;
+
+
+import java.util.Date;
+
+import com.google.gson.Gson;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaProduceRateTracker;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.io.GsonInterfaceAdapter;
+
+
+/**
+ * A {@link KafkaWorkUnitSizeEstimator} that uses historic produce rates of Kafka TopicPartitions and the current lag
+ * to determine the Workunit size. The inputs to the WorkUnitSizeEstimator are the following:
+ * <ul>
+ *   <li> Current Lag in number of records (L) </li>
+ *   <li> Average record size (R) </li>
+ *   <li> Historic produce rates by hour-of-day and day-of-week (P) </li>
+ *   <li> Target SLA to achieve zero lag (SLA) </li>
+ * </ul>
+ * Based on the current lag, historic produce rate for the Kafka TopicPartition, and
+ * a target SLA, we estimate the minimum consume rate (C) required to meet the target SLA using the following formula:
+ * C = (L * R)/SLA + P.
+ * To allow headroom for week-over-week and intra-hour variances, we scale the historic produce rates by
+ * an over-provisioning factor O. The formula is then modified to:
+ * C = (L * R)/SLA + (P * O).
+ * The calculated consumption rate C is returned as the estimated workunit size. Note that the estimated workunit size may exceed the
+ * container capacity. The bin packer is assumed to create a new bin containing only this workunit.
+ *
+ * Assumptions:
+ * <ul>
+ *   <li>The container capacity is assumed to be defined in MB/s</li>
+ *   <li>The topic partition produce rates are assumed to be tracked in bytes/s</li>
+ * </ul>
+ */
+@Slf4j
+public class ProduceRateAndLagBasedWorkUnitSizeEstimator implements KafkaWorkUnitSizeEstimator {
+  public static final String CATCHUP_SLA_IN_HOURS_KEY = "gobblin.kafka.catchUpSlaInHours";
+  private static final int DEFAULT_CATCHUP_SLA_IN_HOURS = 1;
+  //The interval at which workunits are re-calculated.
+  public static final String REPLANNING_INTERVAL_IN_HOURS_KEY = "gobblin.kafka.replanningIntervalInHours";
+  //Set default mode to disable time-based replanning i.e. set replanningIntervalInHours to its maximum value.
+  // In this case, the work unit weight estimation will select the maximum produce rate for the topic partition across
+  // all hours and days of week.
+  private static final int DEFAULT_KAFKA_REPLANNING_INTERVAL = 168; //24 * 7
+  //An over-provisioning factor to provide head room to allow variances in traffic e.g. sub-hour rate variances, week-over-week
+  // traffic variances etc. This config has the effect of multiplying the historic rate by this factor.
+  public static final String PRODUCE_RATE_SCALING_FACTOR_KEY = "gobblin.kafka.produceRateScalingFactor";
+  private static final Double DEFAULT_PRODUCE_RATE_SCALING_FACTOR = 1.3;
+
+  public static final int ONE_MEGA_BYTE = 1048576;
+  private static final Gson GSON = GsonInterfaceAdapter.getGson(Object.class);
+
+  private final long catchUpSlaInHours;
+  private final long replanIntervalInHours;
+  private final double produceRateScalingFactor;
+
+  public ProduceRateAndLagBasedWorkUnitSizeEstimator(SourceState state) {
+    this.catchUpSlaInHours = state.getPropAsLong(CATCHUP_SLA_IN_HOURS_KEY, DEFAULT_CATCHUP_SLA_IN_HOURS);
+    this.replanIntervalInHours =
+        state.getPropAsLong(REPLANNING_INTERVAL_IN_HOURS_KEY, DEFAULT_KAFKA_REPLANNING_INTERVAL);
+    this.produceRateScalingFactor =
+        state.getPropAsDouble(PRODUCE_RATE_SCALING_FACTOR_KEY, DEFAULT_PRODUCE_RATE_SCALING_FACTOR);
+  }
+
+  @Override
+  public double calcEstimatedSize(WorkUnit workUnit) {
+    KafkaStreamingExtractor.KafkaWatermark watermark =
+        GSON.fromJson(workUnit.getProp(KafkaTopicGroupingWorkUnitPacker.PARTITION_WATERMARK),
+            KafkaStreamingExtractor.KafkaWatermark.class);
+    String topic = workUnit.getProp(KafkaSource.TOPIC_NAME);
+    String partition = workUnit.getProp(KafkaSource.PARTITION_ID);
+
+    double[][] avgProduceRates = null;
+    long avgRecordSize = 0L;
+    long offsetLag = Long.parseLong(workUnit.getProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY));
+
+    if (watermark != null) {
+      avgProduceRates = watermark.getAvgProduceRates();
+      avgRecordSize = watermark.getAvgRecordSize();
+      offsetLag = offsetLag - watermark.getLwm().getValue();
+    }
+
+    double maxProduceRate = getMaxProduceRateUntilNextReplan(avgProduceRates,
+        workUnit.getPropAsLong(KafkaTopicGroupingWorkUnitPacker.PACKING_START_TIME_MILLIS));
+
+    if (maxProduceRate < 0) {
+      //No previous estimates found.
+      log.debug("No previous produce rate estimate found for {}", topic + "-" + partition);
+      maxProduceRate = workUnit.getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.DEFAULT_WORKUNIT_SIZE_KEY);
+    }
+
+    //Compute the target consume rate in MB/s.
+    double targetConsumeRate =
+        ((double) (offsetLag * avgRecordSize) / (catchUpSlaInHours * 3600 * ONE_MEGA_BYTE)) + (maxProduceRate
+            * produceRateScalingFactor);
+    log.debug("TopicPartiton: {}, Max produce rate: {}, Offset lag: {}, Avg Record size: {}, Target Consume Rate: {}",
+        topic + ":" + partition, maxProduceRate, offsetLag, avgRecordSize, targetConsumeRate);
+    return targetConsumeRate;
+  }
+
+  /**
+   * @param avgProduceRates
+   * @param packingTimeMillis
+   * @return the maximum produce rate in MB/s observed within the time window of [packingTimeMillis, packingTimeMillis+catchUpSla].
+   */
+  private double getMaxProduceRateUntilNextReplan(double[][] avgProduceRates, long packingTimeMillis) {
+    int dayOfWeek = KafkaProduceRateTracker.getDayOfWeek(new Date(packingTimeMillis));
+    int hourOfDay = KafkaProduceRateTracker.getHourOfDay(new Date(packingTimeMillis));
+
+    if (avgProduceRates == null) {
+      return -1.0;
+    }
+
+    double max = avgProduceRates[dayOfWeek][hourOfDay];
+    for (int i = 0; i < replanIntervalInHours; i++) {
+      if ((hourOfDay + 1) >= 24) {
+        dayOfWeek = (dayOfWeek + 1) % 7;
+      }
+      hourOfDay = (hourOfDay + 1) % 24;
+      if (max < avgProduceRates[dayOfWeek][hourOfDay]) {
+        max = avgProduceRates[dayOfWeek][hourOfDay];
+      }
+    }
+    return max / ONE_MEGA_BYTE;
+  }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/UnitKafkaWorkUnitSizeEstimator.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/UnitKafkaWorkUnitSizeEstimator.java
new file mode 100644
index 0000000..efcd8cf
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/UnitKafkaWorkUnitSizeEstimator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+/**
+ * A dummy implementation of {@link KafkaWorkUnitSizeEstimator} that directly returns unit("1") for
+ * {@link #calcEstimatedSize(WorkUnit)} methods as the estimated size for each {@link WorkUnit}
+ * could be useless in certain circumstances.
+ */
+public class UnitKafkaWorkUnitSizeEstimator implements KafkaWorkUnitSizeEstimator {
+  public UnitKafkaWorkUnitSizeEstimator(SourceState state) {
+    // do nothing
+  }
+
+  @Override
+  public double calcEstimatedSize(WorkUnit workUnit) {
+    return 1;
+  }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
index e1635ea..03fd945 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.log4j.spi.LoggingEvent;
@@ -40,6 +41,7 @@ public class LoggingPusherTest {
 
     TestAppender testAppender = new TestAppender();
     Logger logger = LogManager.getLogger(LoggingPusher.class.getName());
+    logger.setLevel(Level.INFO);
     logger.addAppender(testAppender);
 
     KeyValuePusher<String, String> loggingPusher =
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorUtils.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorUtils.java
new file mode 100644
index 0000000..649cb0d
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorUtils.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.io.File;
+import java.util.List;
+import java.util.Random;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
+import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
+import org.apache.gobblin.runtime.StateStoreBasedWatermarkStorage;
+import org.apache.gobblin.source.extractor.WatermarkInterval;
+import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaTopicGroupingWorkUnitPacker;
+
+import static org.apache.gobblin.configuration.ConfigurationKeys.JOB_NAME_KEY;
+import static org.apache.gobblin.configuration.ConfigurationKeys.WATERMARK_INTERVAL_VALUE_KEY;
+
+@Slf4j
+public class KafkaExtractorUtils {
+  private static final Integer MAX_NUM_BROKERS = 100;
+  private static final Integer MAX_NUM_TOPIC_PARTITIONS = 1024;
+
+  /**
+   * A utility method that returns a {@link WorkUnitState} which can be used to instantiate both a batch and a
+   * streaming Kafka extractor.
+   * @param topicName
+   * @param numPartitions
+   * @return
+   */
+  public static WorkUnitState getWorkUnitState(String topicName, int numPartitions) {
+    Preconditions.checkArgument(numPartitions <= MAX_NUM_TOPIC_PARTITIONS, "Num partitions assigned"
+        + "must be smaller than the maximum number of partitions of the topic");
+    WorkUnitState state = new WorkUnitState();
+    state.setProp(JOB_NAME_KEY, "testJob");
+    state.setProp(KafkaSource.TOPIC_NAME, topicName);
+    state.setProp(KafkaSource.GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS, KafkaStreamTestUtils.MockKafka10ConsumerClientFactory.class.getName());
+    state.setProp(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_CLASS,
+        KafkaStreamTestUtils.MockSchemaRegistry.class.getName());
+    state.setProp(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS, KafkaStreamTestUtils.MockSchemaRegistry.class.getName());
+    //Need to set this property for LiKafka10ConsumerClient instantiation
+    state.setProp(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_URL, "http://dummySchemaRegistry:1000");
+    Random random = new Random();
+    for (int i=0; i<numPartitions; i++) {
+      //Assign a random partition id.
+      state.setProp(KafkaSource.PARTITION_ID + "." + i, random.nextInt(MAX_NUM_TOPIC_PARTITIONS));
+      state.setProp(KafkaSource.LEADER_ID + "." + i, random.nextInt(MAX_NUM_BROKERS));
+      state.setProp(KafkaSource.LEADER_HOSTANDPORT + "." + i, "leader-" + i + ":9091");
+    }
+    state.setProp(KafkaTopicGroupingWorkUnitPacker.NUM_PARTITIONS_ASSIGNED, numPartitions);
+    //Configure the watermark storage. We use FileContextBasedFsStateStoreFactory, since it allows for overwriting an existing
+    // state store.
+    state.setProp(StateStoreBasedWatermarkStorage.WATERMARK_STORAGE_TYPE_KEY, FileContextBasedFsStateStoreFactory.class.getName());
+    File stateStoreDir = Files.createTempDir();
+    stateStoreDir.deleteOnExit();
+    state.setProp(StateStoreBasedWatermarkStorage.WATERMARK_STORAGE_CONFIG_PREFIX + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, stateStoreDir.getAbsolutePath());
+
+    //Kafka configurations
+    state.setProp(ConfigurationKeys.KAFKA_BROKERS, "localhost:9091");
+
+    // Constructing a dummy watermark and mock client-factory-class and registry-class
+    List<Long> dummyWatermark = ImmutableList.of(new Long(1));
+    WatermarkInterval watermarkInterval =
+        new WatermarkInterval(new MultiLongWatermark(dummyWatermark), new MultiLongWatermark(dummyWatermark));
+    state.setProp(WATERMARK_INTERVAL_VALUE_KEY, watermarkInterval.toJson());
+
+    state.setWuProperties(state.getProperties(), state.getProperties());
+    return state;
+  }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTrackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTrackerTest.java
new file mode 100644
index 0000000..2dbc351
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTrackerTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.joda.time.LocalDate;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.EvictingQueue;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.extractor.CheckpointableWatermark;
+import org.apache.gobblin.source.extractor.extract.FlushingExtractor;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
+import org.apache.gobblin.writer.LastWatermarkTracker;
+import org.apache.gobblin.writer.WatermarkTracker;
+
+
+public class KafkaProduceRateTrackerTest {
+  private static final LocalDate HOLIDAY_DATE = new LocalDate(2019, 12, 25);
+  private static final LocalDate NON_HOLIDAY_DATE = new LocalDate(2020, 01, 05);
+  private static final Long HOLIDAY_TIME = HOLIDAY_DATE.toDateTimeAtStartOfDay().toInstant().getMillis();
+  private static final Long NON_HOLIDAY_TIME = NON_HOLIDAY_DATE.toDateTimeAtStartOfDay().toInstant().getMillis();
+
+  private KafkaProduceRateTracker tracker;
+  private List<KafkaPartition> kafkaPartitions = new ArrayList<>();
+  private WatermarkTracker watermarkTracker;
+  private WorkUnitState workUnitState;
+  private KafkaExtractorStatsTracker extractorStatsTracker;
+
+  @BeforeClass
+  public void setUp() {
+    kafkaPartitions.add(new KafkaPartition.Builder().withTopicName("test-topic").withId(0).build());
+    kafkaPartitions.add(new KafkaPartition.Builder().withTopicName("test-topic").withId(1).build());
+    this.workUnitState = new WorkUnitState();
+    this.workUnitState.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, 5L);
+    this.watermarkTracker = new LastWatermarkTracker(false);
+    this.extractorStatsTracker = new KafkaExtractorStatsTracker(this.workUnitState, kafkaPartitions);
+  }
+
+  private void assertTopicPartitionOrder(KafkaProduceRateTracker tracker, List<KafkaPartition> partitions) {
+    Iterator<KafkaPartition> keyIterator = tracker.getPartitionsToProdRate().keySet().iterator();
+    for (KafkaPartition partition : partitions) {
+      Assert.assertEquals(partition, keyIterator.next());
+    }
+  }
+
+  private void writeProduceRateToKafkaWatermarksHelper(long readStartTime, long decodeStartTime, long currentTime) {
+    this.extractorStatsTracker.reset();
+    assertTopicPartitionOrder(tracker, kafkaPartitions);
+    extractorStatsTracker.onDecodeableRecord(0, readStartTime, decodeStartTime, 100, currentTime-8000, currentTime-10000);
+    readStartTime++;
+    decodeStartTime++;
+    extractorStatsTracker.onDecodeableRecord(1, readStartTime, decodeStartTime, 200, currentTime-7000, currentTime-9000);
+    extractorStatsTracker.updateStatisticsForCurrentPartition(0, readStartTime, currentTime - 8000);
+    extractorStatsTracker.updateStatisticsForCurrentPartition(1, readStartTime, currentTime - 7000);
+
+    MultiLongWatermark highWatermark = new MultiLongWatermark(Lists.newArrayList(20L, 30L));
+    Map<KafkaPartition, Long> latestOffsetMap = Maps.newHashMap();
+    latestOffsetMap.put(kafkaPartitions.get(0), 35L);
+    latestOffsetMap.put(kafkaPartitions.get(1), 47L);
+    Map<String, CheckpointableWatermark> lastCommittedWatermarks = Maps.newHashMap();
+
+    KafkaPartition topicPartition1 = kafkaPartitions.get(0);
+    KafkaPartition topicPartition2 = kafkaPartitions.get(1);
+
+    lastCommittedWatermarks.put(topicPartition1.toString(),
+        new KafkaStreamingExtractor.KafkaWatermark(topicPartition1, new LongWatermark(5L)));
+    lastCommittedWatermarks.put(topicPartition2.toString(),
+        new KafkaStreamingExtractor.KafkaWatermark(topicPartition2, new LongWatermark(7L)));
+    this.tracker.writeProduceRateToKafkaWatermarks(latestOffsetMap, lastCommittedWatermarks, highWatermark, currentTime);
+  }
+
+  private void assertKafkaWatermarks(long currentTime) {
+    Map<String, CheckpointableWatermark> unacknowledgedWatermarks = watermarkTracker.getAllUnacknowledgedWatermarks();
+    Assert.assertEquals(unacknowledgedWatermarks.size(), 2);
+
+    KafkaPartition topicPartition1 = kafkaPartitions.get(0);
+    KafkaPartition topicPartition2 = kafkaPartitions.get(1);
+
+    for (KafkaPartition topicPartition : Lists.newArrayList(topicPartition1, topicPartition2)) {
+      KafkaStreamingExtractor.KafkaWatermark kafkaWatermark = (KafkaStreamingExtractor.KafkaWatermark) unacknowledgedWatermarks.get(topicPartition.toString());
+      if (currentTime == HOLIDAY_TIME + 10) {
+        Assert.assertTrue(kafkaWatermark.avgProduceRates == null);
+        continue;
+      }
+
+      Assert.assertTrue(kafkaWatermark.avgProduceRates != null);
+      Date date = new Date(currentTime);
+      int hourOfDay = KafkaProduceRateTracker.getHourOfDay(date);
+      int dayOfWeek = KafkaProduceRateTracker.getDayOfWeek(date);
+
+      Assert.assertTrue(kafkaWatermark.avgProduceRates[dayOfWeek][hourOfDay] > 0);
+      for (int i = 0; i < 7; i++) {
+        for (int j = 0; j < 24; j++) {
+          if (i != dayOfWeek || j != hourOfDay) {
+            Assert.assertTrue(kafkaWatermark.avgProduceRates[i][j] < 0);
+          }
+        }
+      }
+      Assert.assertTrue(kafkaWatermark.getAvgConsumeRate() > 0);
+    }
+  }
+
+  @Test
+  public void testWriteProduceRateToKafkaWatermarks() {
+    long readStartTime = System.nanoTime();
+    long decodeStartTime = readStartTime + 1;
+    long currentTime = System.currentTimeMillis();
+
+    WorkUnitState workUnitState = new WorkUnitState();
+    workUnitState.setProp(KafkaProduceRateTracker.KAFKA_PRODUCE_RATE_DISABLE_STATS_ON_HOLIDAYS_KEY, false);
+    workUnitState.setProp(FlushingExtractor.FLUSH_INTERVAL_SECONDS_KEY, 1L);
+    this.tracker = new KafkaProduceRateTracker(workUnitState, kafkaPartitions, watermarkTracker, extractorStatsTracker, currentTime);
+
+    //Bootstrap the extractorStatsTracker
+    writeProduceRateToKafkaWatermarksHelper(readStartTime, decodeStartTime, currentTime);
+
+    for (int i = 1; i < KafkaProduceRateTracker.SLIDING_WINDOW_SIZE + 1; i++) {
+      //Add more records and update watermarks/stats
+      writeProduceRateToKafkaWatermarksHelper(readStartTime + 1000 + i, decodeStartTime + 1000 + i,
+          currentTime + i);
+    }
+
+    //Ensure kafka watermark is non-null and is > 0 for the hour-of-day and day-of-week corresponding to currentTime
+    assertKafkaWatermarks(currentTime + KafkaProduceRateTracker.SLIDING_WINDOW_SIZE);
+  }
+
+  @Test (dependsOnMethods = "testWriteProduceRateToKafkaWatermarks")
+  public void testWriteProduceRateToKafkaWatermarksWithHolidays() {
+    long readStartTime = TimeUnit.MILLISECONDS.toNanos(HOLIDAY_TIME);
+    long decodeStartTime = readStartTime + 1;
+    Long currentTime = HOLIDAY_TIME + 10;
+
+    WorkUnitState workUnitState = new WorkUnitState();
+    workUnitState.setProp(KafkaProduceRateTracker.KAFKA_PRODUCE_RATE_DISABLE_STATS_ON_HOLIDAYS_KEY, true);
+    workUnitState.setProp(FlushingExtractor.FLUSH_INTERVAL_SECONDS_KEY, 1L);
+    this.tracker = new KafkaProduceRateTracker(workUnitState, kafkaPartitions, watermarkTracker, extractorStatsTracker, currentTime);
+
+    //Bootstrap the extractorStatsTracker; Holiday stats collection disabled.
+    writeProduceRateToKafkaWatermarksHelper(readStartTime, decodeStartTime, currentTime);
+
+    //Add a more records and update watermarks/stats
+    writeProduceRateToKafkaWatermarksHelper(readStartTime + 1000, decodeStartTime + 1000, currentTime + 1);
+
+    //Since stats collection is disabled on holidays, ensure watermarks are null.
+    assertKafkaWatermarks(currentTime);
+
+    readStartTime = TimeUnit.MILLISECONDS.toNanos(NON_HOLIDAY_TIME);
+    decodeStartTime = readStartTime + 1;
+    currentTime = NON_HOLIDAY_TIME + 10;
+
+    //Bootstrap the extractorStatsTracker with initial records
+    writeProduceRateToKafkaWatermarksHelper(readStartTime, decodeStartTime, currentTime);
+
+    for (int i = 1; i < KafkaProduceRateTracker.SLIDING_WINDOW_SIZE + 1; i++) {
+      //Add more records and update watermarks/stats
+      writeProduceRateToKafkaWatermarksHelper(readStartTime + 1000 + i, decodeStartTime + 1000 + i,
+          currentTime + i);
+    }
+    //Ensure kafka watermark is not null and is > 0 for the hour-of-day and day-of-week corresponding to currentTime
+    assertKafkaWatermarks(currentTime + KafkaProduceRateTracker.SLIDING_WINDOW_SIZE);
+  }
+
+  @Test
+  public void testIsHoliday() {
+    WorkUnitState workUnitState = new WorkUnitState();
+    workUnitState.setProp(KafkaProduceRateTracker.KAFKA_PRODUCE_RATE_DISABLE_STATS_ON_HOLIDAYS_KEY, true);
+    KafkaExtractorStatsTracker extractorStatsTracker = new KafkaExtractorStatsTracker(this.workUnitState, kafkaPartitions);
+    KafkaProduceRateTracker tracker = new KafkaProduceRateTracker(workUnitState, kafkaPartitions, watermarkTracker, extractorStatsTracker);
+    Assert.assertTrue(tracker.isHoliday(HOLIDAY_DATE));
+    //Ensure that the caching behavior is correct
+    Assert.assertTrue(tracker.isHoliday(HOLIDAY_DATE));
+    Assert.assertFalse(tracker.isHoliday(NON_HOLIDAY_DATE));
+  }
+
+  @Test
+  public void testGetPenultimateElement() {
+    EvictingQueue<Double> queue = EvictingQueue.create(3);
+
+    queue.add(1.0);
+    queue.add(2.0);
+    queue.add(3.0);
+
+    Double element = KafkaProduceRateTracker.getPenultimateElement(queue);
+    Assert.assertEquals(element, 2.0);
+
+    queue.add(4.0);
+    element = KafkaProduceRateTracker.getPenultimateElement(queue);
+    Assert.assertEquals(element, 3.0);
+
+    queue.add(5.0);
+    element = KafkaProduceRateTracker.getPenultimateElement(queue);
+    Assert.assertEquals(element, 4.0);
+  }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamTestUtils.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamTestUtils.java
new file mode 100644
index 0000000..731c0b1
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamTestUtils.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+import java.util.regex.Pattern;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.mockito.Mockito;
+
+import com.codahale.metrics.Metric;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.kafka.client.AbstractBaseKafkaConsumerClient;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
+import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static org.mockito.Mockito.mock;
+
+
+/**
+ * A bunch of basic mocking class that can be used for different implementation to mock kafka clients.
+ */
+public class KafkaStreamTestUtils {
+  public static class MockKafkaConsumerClientFactory
+      implements GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory {
+    static final AbstractBaseKafkaConsumerClient MOCKED_KAFKA_CLIENT = mock(AbstractBaseKafkaConsumerClient.class);
+
+    @Override
+    public GobblinKafkaConsumerClient create(Config config) {
+      return MOCKED_KAFKA_CLIENT;
+    }
+  }
+
+  public static class MockKafka10ConsumerClientFactory
+      implements GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory {
+    @Override
+    public GobblinKafkaConsumerClient create(Config config) {
+      return new MockKafkaConsumerClient(config);
+    }
+  }
+
+  /**
+   * A mock implementation of {@link GobblinKafkaConsumerClient} that returns a {@link MockIterator} on
+   * invocation of the {@link MockKafkaConsumerClient#consume()} method.
+   */
+  public static class MockKafkaConsumerClient implements GobblinKafkaConsumerClient {
+    public static final String NUM_PARTITIONS_ASSIGNED = "gobblin.kafka.streaming.numPartitions";
+
+    private final Map<KafkaPartition, Long> latestOffsets = Maps.newHashMap();
+    private final Random random = new Random();
+    private final String topicName;
+    private final List<Integer> partitionIds;
+
+    protected MockKafkaConsumerClient(Config baseConfig) {
+      this.topicName = baseConfig.getString(KafkaSource.TOPIC_NAME);
+      int numPartitionsAssigned = ConfigUtils.getInt(baseConfig, NUM_PARTITIONS_ASSIGNED, 0);
+      this.partitionIds = getPartitionIds(baseConfig, numPartitionsAssigned);
+    }
+
+    public MockKafkaConsumerClient() {
+      topicName = "";
+      partitionIds = Lists.newArrayList();
+    }
+
+    private List<Integer> getPartitionIds(Config baseConfig, int numPartitionsAssigned) {
+      List<Integer> partitionIds = Lists.newArrayList();
+      for (int i = 0; i < numPartitionsAssigned; i++) {
+        String partitionIdProp = KafkaSource.PARTITION_ID + "." + i;
+        partitionIds.add(baseConfig.getInt(partitionIdProp));
+      }
+      return partitionIds;
+    }
+
+    /**
+     *
+     * @return a {@link MockIterator} over {@link KafkaConsumerRecord}s.
+     */
+    @Override
+    public Iterator<KafkaConsumerRecord> consume() {
+      return new MockIterator(this.topicName, this.partitionIds);
+    }
+
+    @Override
+    public void assignAndSeek(List<KafkaPartition> topicPartitions,
+        Map<KafkaPartition, LongWatermark> topicWatermarks) {
+      return;
+    }
+
+    @Override
+    public List<KafkaTopic> getFilteredTopics(List<Pattern> blacklist, List<Pattern> whitelist) {
+      return null;
+    }
+
+    @Override
+    public long getEarliestOffset(KafkaPartition partition)
+        throws KafkaOffsetRetrievalFailureException {
+      return 0;
+    }
+
+    @Override
+    public long getLatestOffset(KafkaPartition partition)
+        throws KafkaOffsetRetrievalFailureException {
+      return 0;
+    }
+
+    /**
+     * Returns a random offset for each {@link KafkaPartition}. The method ensures that the offsets are monotonically
+     * increasing for each {@link KafkaPartition} i.e. each subsequent call to the method will return a higher offset
+     * for every partition in the partition list.
+     * @param partitions
+     * @return
+     */
+    @Override
+    public Map<KafkaPartition, Long> getLatestOffsets(Collection<KafkaPartition> partitions) {
+      for (KafkaPartition partition : partitions) {
+        if (this.latestOffsets.containsKey(partition)) {
+          this.latestOffsets.put(partition, this.latestOffsets.get(partition) + 100);
+        } else {
+          this.latestOffsets.put(partition, new Long(random.nextInt(100000)));
+        }
+      }
+      return this.latestOffsets;
+    }
+
+    @Override
+    public Iterator<KafkaConsumerRecord> consume(KafkaPartition partition, long nextOffset, long maxOffset) {
+      return null;
+    }
+
+    @Override
+    public Map<String, Metric> getMetrics() {
+      return new HashMap<>();
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+
+    }
+  }
+
+  public static class MockSchemaRegistry extends KafkaSchemaRegistry<String, Schema> {
+    static Schema latestSchema = Schema.create(Schema.Type.STRING);
+
+    public MockSchemaRegistry(Properties props) {
+      super(props);
+    }
+
+    @Override
+    protected Schema fetchSchemaByKey(String key) throws SchemaRegistryException {
+      return null;
+    }
+
+    @Override
+    public Schema getLatestSchemaByTopic(String topic) throws SchemaRegistryException {
+      return latestSchema;
+    }
+
+    @Override
+    public String register(Schema schema) throws SchemaRegistryException {
+      return null;
+    }
+
+    @Override
+    public String register(Schema schema, String name) throws SchemaRegistryException {
+      this.latestSchema = schema;
+      return schema.toString();
+    }
+  }
+
+  public static class LowLevelMockSchemaRegistry
+      implements org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistry<String, Schema> {
+    private Schema latestSchema;
+
+    public LowLevelMockSchemaRegistry(Properties props) {
+    }
+
+    @Override
+    public String register(String name, Schema schema)
+        throws org.apache.gobblin.kafka.schemareg.SchemaRegistryException {
+      this.latestSchema = schema;
+      return schema.toString();
+    }
+
+    @Override
+    public Schema getById(String id) throws IOException, org.apache.gobblin.kafka.schemareg.SchemaRegistryException {
+      return null;
+    }
+
+    @Override
+    public Schema getLatestSchema(String name)
+        throws IOException, org.apache.gobblin.kafka.schemareg.SchemaRegistryException {
+      return this.latestSchema;
+    }
+
+    @Override
+    public boolean hasInternalCache() {
+      return false;
+    }
+  }
+
+  /**
+   * A mock iterator of {@link KafkaConsumerRecord}s. The caller provides a topicName, a list of partition ids and
+   * optionally, the number of records the iterator must iterate over. On each call to next(), the iterator returns
+   * a mock {@link KafkaConsumerRecord}, with a partition id assigned in a round-robin fashion over the input list of
+   * partition ids.
+   */
+  public static class MockIterator implements Iterator<KafkaConsumerRecord> {
+    //Schema for LiKafka10ConsumerRecords. TODO: Enhance the iterator to return random records
+    // according to a given schema.
+    private static final String SCHEMA =
+        "{\"namespace\": \"example.avro\",\n" + " \"type\": \"record\",\n" + " \"name\": \"user\",\n"
+            + " \"fields\": [\n" + "     {\"name\": \"name\", \"type\": \"string\"},\n"
+            + "     {\"name\": \"DUMMY\", \"type\": [\"null\",\"string\"]}\n" + " ]\n" + "}";
+
+    private final Schema schema = new Schema.Parser().parse(SCHEMA);
+    private final Random random = new Random();
+    private final String topicName;
+    private final long maxNumRecords;
+    private final List<Integer> partitionIds;
+    private final long[] nextOffsets;
+    private long numRecordsReturnedSoFar;
+    private int partitionIdx = 0;
+
+    public MockIterator(String topicName, List<Integer> partitionIds) {
+      this(topicName, partitionIds, Long.MAX_VALUE);
+    }
+
+    public MockIterator(String topicName, List<Integer> partitionIds, long numRecords) {
+      this.topicName = topicName;
+      this.maxNumRecords = numRecords;
+      this.partitionIds = partitionIds;
+      this.nextOffsets = new long[partitionIds.size()];
+    }
+
+    /**
+     * Returns {@code true} if the iteration has more elements.
+     * (In other words, returns {@code true} if {@link #next} would
+     * return an element rather than throwing an exception.)
+     *
+     * @return {@code true} if the iteration has more elements
+     */
+    @Override
+    public boolean hasNext() {
+      return this.numRecordsReturnedSoFar < this.maxNumRecords;
+    }
+
+    /**
+     * Returns the next element in the iteration.
+     *
+     * @return the next element in the iteration
+     * @throws java.util.NoSuchElementException if the iteration has no more elements
+     */
+    @Override
+    public KafkaConsumerRecord next() {
+      this.numRecordsReturnedSoFar++;
+      return getMockConsumerRecord(this.numRecordsReturnedSoFar);
+    }
+
+    private KafkaConsumerRecord getMockConsumerRecord(long numRecordsReturnedSoFar) {
+      DecodeableKafkaRecord mockRecord = Mockito.mock(DecodeableKafkaRecord.class);
+      Mockito.when(mockRecord.getValue()).thenReturn(getRecord());
+      Mockito.when(mockRecord.getTopic()).thenReturn(topicName);
+      Mockito.when(mockRecord.getPartition()).thenReturn(this.partitionIds.get(partitionIdx));
+      this.partitionIdx = (this.partitionIdx + 1) % this.partitionIds.size();
+      //Increment the next offset of the record
+      this.nextOffsets[partitionIdx]++;
+      Mockito.when(mockRecord.getNextOffset()).thenReturn(this.nextOffsets[partitionIdx]);
+      return mockRecord;
+    }
+
+    private GenericRecord getRecord() {
+      GenericRecord record = new GenericData.Record(schema);
+      record.put("name", UUID.randomUUID());
+      return record;
+    }
+  }
+
+
+}
+
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
new file mode 100644
index 0000000..1cf7ff2
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.publisher.DataPublisher;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.extract.FlushingExtractor;
+
+
+public class KafkaStreamingExtractorTest {
+  private KafkaStreamingExtractor streamingExtractor;
+  private final int numPartitions = 3;
+
+  @BeforeClass
+  public void setUp() {
+    WorkUnitState state = KafkaExtractorUtils.getWorkUnitState("testTopic", numPartitions);
+    state.setProp(FlushingExtractor.FLUSH_DATA_PUBLISHER_CLASS, TestDataPublisher.class.getName());
+    this.streamingExtractor = new KafkaStreamingExtractor(state);
+  }
+
+  @Test
+  public void testResetExtractorStats()
+      throws IOException, DataRecordException {
+    MultiLongWatermark highWatermark1 = new MultiLongWatermark(this.streamingExtractor.highWatermark);
+
+    //Read 2 records
+    this.streamingExtractor.readStreamEntityImpl();
+    Assert.assertEquals( this.streamingExtractor.nextWatermark.get(0), 1L);
+    Assert.assertEquals( this.streamingExtractor.nextWatermark.get(1), 0L);
+    Assert.assertEquals( this.streamingExtractor.nextWatermark.get(2), 0L);
+
+    this.streamingExtractor.readStreamEntityImpl();
+    Assert.assertEquals( this.streamingExtractor.nextWatermark.get(0), 1L);
+    Assert.assertEquals( this.streamingExtractor.nextWatermark.get(1), 1L);
+    Assert.assertEquals( this.streamingExtractor.nextWatermark.get(2), 0L);
+
+    //Checkpoint watermarks
+    this.streamingExtractor.onFlushAck();
+
+    //Reset extractor stats
+    this.streamingExtractor.resetExtractorStatsAndWatermarks(false);
+
+    //Ensure post-reset invariance is satisfied i.e. low watermark and next watermark are identical.
+    testAfterReset(highWatermark1);
+
+    MultiLongWatermark highWatermark2 = new MultiLongWatermark(this.streamingExtractor.highWatermark);
+    //Read 1 more record
+    this.streamingExtractor.readStreamEntityImpl();
+    Assert.assertEquals( this.streamingExtractor.nextWatermark.get(0), 1L);
+    Assert.assertEquals( this.streamingExtractor.nextWatermark.get(1), 1L);
+    Assert.assertEquals( this.streamingExtractor.nextWatermark.get(2), 1L);
+
+    Assert.assertEquals( this.streamingExtractor.lowWatermark.get(0), 1L);
+    Assert.assertEquals( this.streamingExtractor.lowWatermark.get(1), 1L);
+    Assert.assertEquals( this.streamingExtractor.lowWatermark.get(2), 0L);
+
+    //Checkpoint watermarks
+    this.streamingExtractor.onFlushAck();
+
+    //Reset extractor stats
+    this.streamingExtractor.resetExtractorStatsAndWatermarks(false);
+
+    //Ensure post-reset invariance is satisfied.
+    testAfterReset(highWatermark2);
+  }
+
+  private void testAfterReset(MultiLongWatermark previousHighWatermark) {
+    //Ensure that low and next watermarks are identical after reset. Also ensure the new high watermark is greater than
+    // the previous high watermark.
+    for (int i=0; i < numPartitions; i++) {
+      Assert.assertEquals(this.streamingExtractor.lowWatermark.get(i), this.streamingExtractor.nextWatermark.get(i));
+      Assert.assertTrue(previousHighWatermark.get(i) <= this.streamingExtractor.highWatermark.get(i));
+    }
+  }
+
+  @Test
+  public void testGenerateAdditionalTagHelper() throws Exception {
+    // Verifying that produce rate has been added.
+    Map<KafkaPartition, Map<String, String>> result = this.streamingExtractor.getAdditionalTagsHelper();
+    for (Map<String, String> entry: result.values()) {
+      Assert.assertTrue(entry.containsKey(KafkaProduceRateTracker.KAFKA_PARTITION_PRODUCE_RATE_KEY));
+    }
+  }
+
+  static class TestDataPublisher extends DataPublisher {
+    public TestDataPublisher(WorkUnitState state) {
+      super(state);
+    }
+
+    @Override
+    public void initialize() {
+    }
+
+    @Override
+    public void publishData(Collection<? extends WorkUnitState> states) {
+
+    }
+
+    @Override
+    public void publishMetadata(Collection<? extends WorkUnitState> states) {
+
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+
+    }
+  }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java
new file mode 100644
index 0000000..eb676d4
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils;
+import org.apache.gobblin.source.extractor.extract.kafka.UniversalKafkaSource;
+import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+public class KafkaTopicGroupingWorkUnitPackerTest {
+  private Properties props;
+
+  @BeforeMethod
+  public void setUp() {
+    props = new Properties();
+    props.setProperty("gobblin.kafka.streaming.containerCapacity", "2");
+    props.setProperty("kafka.workunit.size.estimator.type", "CUSTOM");
+    props.setProperty("kafka.workunit.size.estimator.customizedType",
+        "org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.UnitKafkaWorkUnitSizeEstimator");
+  }
+
+  /**
+   * Check that capacity is honored.
+   */
+  @Test
+  public void testSingleTopic() {
+    KafkaSource source = new UniversalKafkaSource();
+    SourceState state = new SourceState(new State(props));
+    state.setProp("gobblin.kafka.streaming.enableIndexing", false);
+    state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, Files.createTempDir().getAbsolutePath());
+
+    Map<String, List<WorkUnit>> workUnitsByTopic = ImmutableMap.of("topic1", Lists
+        .newArrayList(getWorkUnitWithTopicPartition("topic1", 1), getWorkUnitWithTopicPartition("topic1", 2),
+            getWorkUnitWithTopicPartition("topic1", 3)));
+
+    List<WorkUnit> workUnits = new KafkaTopicGroupingWorkUnitPacker(source, state, Optional.absent()).pack(workUnitsByTopic, 10);
+    Assert.assertEquals(workUnits.size(), 2);
+    Assert.assertEquals(workUnits.get(0).getProp(KafkaSource.TOPIC_NAME), "topic1");
+    Assert.assertEquals(workUnits.get(0).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, 0)), 1);
+    Assert.assertEquals(workUnits.get(0).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, 1)), 2);
+    Assert.assertEquals(workUnits.get(1).getProp(KafkaSource.TOPIC_NAME), "topic1");
+    Assert.assertEquals(workUnits.get(1).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, 0)), 3);
+  }
+
+  /**
+   * Check that topics are kept in separate work units.
+   */
+  @Test
+  public void testMultiTopic() {
+    KafkaSource source = new UniversalKafkaSource();
+    SourceState state = new SourceState(new State(props));
+    state.setProp("gobblin.kafka.streaming.enableIndexing", false);
+    state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, Files.createTempDir().getAbsolutePath());
+
+    Map<String, List<WorkUnit>> workUnitsByTopic = ImmutableMap.of("topic1", Lists
+        .newArrayList(getWorkUnitWithTopicPartition("topic1", 1), getWorkUnitWithTopicPartition("topic1", 2),
+            getWorkUnitWithTopicPartition("topic1", 3)), "topic2", Lists
+        .newArrayList(getWorkUnitWithTopicPartition("topic2", 1), getWorkUnitWithTopicPartition("topic2", 2),
+            getWorkUnitWithTopicPartition("topic2", 3)));
+
+    List<WorkUnit> workUnits = new KafkaTopicGroupingWorkUnitPacker(source, state, Optional.absent()).pack(workUnitsByTopic, 10);
+    Assert.assertEquals(workUnits.size(), 4);
+
+    Assert.assertEquals(workUnits.get(0).getProp(KafkaSource.TOPIC_NAME), "topic1");
+    Assert.assertEquals(workUnits.get(0).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, 0)), 1);
+    Assert.assertEquals(workUnits.get(0).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, 1)), 2);
+    Assert.assertEquals(workUnits.get(1).getProp(KafkaSource.TOPIC_NAME), "topic1");
+    Assert.assertEquals(workUnits.get(1).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, 0)), 3);
+
+    Assert.assertEquals(workUnits.get(2).getProp(KafkaSource.TOPIC_NAME), "topic2");
+    Assert.assertEquals(workUnits.get(2).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, 0)), 1);
+    Assert.assertEquals(workUnits.get(2).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, 1)), 2);
+    Assert.assertEquals(workUnits.get(3).getProp(KafkaSource.TOPIC_NAME), "topic2");
+    Assert.assertEquals(workUnits.get(3).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, 0)), 3);
+  }
+
+
+  public WorkUnit getWorkUnitWithTopicPartition(String topic, int partition) {
+    WorkUnit workUnit = new WorkUnit(new Extract(Extract.TableType.APPEND_ONLY, "kafka", topic));
+    workUnit.setProp(KafkaSource.TOPIC_NAME, topic);
+    workUnit.setProp(KafkaSource.PARTITION_ID, Integer.toString(partition));
+    workUnit.setProp(KafkaSource.LEADER_HOSTANDPORT, "host:1234");
+    workUnit.setProp(KafkaSource.LEADER_ID, "1");
+
+    workUnit.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, "0");
+    workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, "0");
+
+    workUnit.setProp("previousStartFetchEpochTime", "0");
+    workUnit.setProp("previousStopFetchEpochTime", "0");
+    workUnit.setProp("previousLowWatermark", "0");
+    workUnit.setProp("previousHighWatermark", "0");
+    workUnit.setProp("previousLatestOffset", "0");
+    workUnit.setProp("offsetFetchEpochTime", "0");
+    workUnit.setProp("previousOffsetFetchEpochTime", "0");
+
+    return workUnit;
+  }
+
+  @Test
+  public void testGetContainerCapacityForTopic() {
+    double delta = 0.000001; //Error tolerance limit for assertions involving double.
+    KafkaTopicGroupingWorkUnitPacker.ContainerCapacityComputationStrategy strategy =
+        KafkaTopicGroupingWorkUnitPacker.ContainerCapacityComputationStrategy.MIN;
+    List<Double> capacities = Arrays.asList(new Double[]{1.2, 1.4, 1.3, 1.4, 1.2});
+    double capacity = KafkaTopicGroupingWorkUnitPacker.getContainerCapacityForTopic(capacities, strategy);
+    Assert.assertEquals(capacity, 1.2, delta);
+    strategy = KafkaTopicGroupingWorkUnitPacker.ContainerCapacityComputationStrategy.MAX;
+    capacity = KafkaTopicGroupingWorkUnitPacker.getContainerCapacityForTopic(capacities, strategy);
+    Assert.assertEquals(capacity, 1.4, delta);
+    strategy = KafkaTopicGroupingWorkUnitPacker.ContainerCapacityComputationStrategy.MEDIAN;
+    capacity = KafkaTopicGroupingWorkUnitPacker.getContainerCapacityForTopic(capacities, strategy);
+    Assert.assertEquals(capacity, 1.3, delta);
+    strategy = KafkaTopicGroupingWorkUnitPacker.ContainerCapacityComputationStrategy.MEAN;
+    capacity = KafkaTopicGroupingWorkUnitPacker.getContainerCapacityForTopic(capacities, strategy);
+    Assert.assertEquals(capacity, 1.3, delta);
+
+    //Validate the median for an even sized list
+    capacities = Arrays.asList(new Double[]{1.2, 1.4, 1.3, 1.4});
+    strategy = KafkaTopicGroupingWorkUnitPacker.ContainerCapacityComputationStrategy.MEDIAN;
+    capacity = KafkaTopicGroupingWorkUnitPacker.getContainerCapacityForTopic(capacities, strategy);
+    Assert.assertEquals(capacity, 1.35, delta);
+  }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimatorTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimatorTest.java
new file mode 100644
index 0000000..d199295
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimatorTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.TimeZone;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.gson.Gson;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.io.GsonInterfaceAdapter;
+
+
+public class ProduceRateAndLagBasedWorkUnitSizeEstimatorTest {
+  private static final Gson GSON = GsonInterfaceAdapter.getGson(Object.class);
+  private static final String TEST_TOPIC = "test";
+  private static final long AVG_RECORD_SIZE = 1024L;
+  private static final String BINPACKING_TIME_1 = "11/17/2019 23:10:00";
+  private static final String BINPACKING_TIME_2 = "11/19/2019 08:00:00";
+
+  private double[][] avgProduceRates = new double[7][24];
+  private ProduceRateAndLagBasedWorkUnitSizeEstimator estimator;
+
+  @BeforeClass
+  public void setUp() {
+    double rate = 1.0;
+    for (int i = 0; i < 7; i++) {
+      for (int j = 0; j < 24; j++) {
+        if (i == 2) {
+          avgProduceRates[i][j] = -1.0;
+        } else {
+          avgProduceRates[i][j] = rate * ProduceRateAndLagBasedWorkUnitSizeEstimator.ONE_MEGA_BYTE;
+          rate++;
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testCalcEstimatedSize() throws ParseException {
+    SourceState sourceState = new SourceState();
+    sourceState.setProp(ProduceRateAndLagBasedWorkUnitSizeEstimator.CATCHUP_SLA_IN_HOURS_KEY, 3);
+    sourceState.setProp(ProduceRateAndLagBasedWorkUnitSizeEstimator.REPLANNING_INTERVAL_IN_HOURS_KEY, 3);
+    sourceState.setProp(ProduceRateAndLagBasedWorkUnitSizeEstimator.PRODUCE_RATE_SCALING_FACTOR_KEY, 1);
+    this.estimator = new ProduceRateAndLagBasedWorkUnitSizeEstimator(sourceState);
+    SimpleDateFormat format = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss");
+    format.setTimeZone(TimeZone.getDefault());
+
+    //WorkUnit with no KafkaWatermark
+    KafkaStreamingExtractor.KafkaWatermark watermark = null;
+    WorkUnit workUnit = WorkUnit.createEmpty();
+    workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.PARTITION_WATERMARK, GSON.toJson(watermark));
+    workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.DEFAULT_WORKUNIT_SIZE_KEY, 1.0);
+    workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, Long.toString(6 * 3600 * 1024));
+    workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.PACKING_START_TIME_MILLIS, format.parse(BINPACKING_TIME_1).getTime());
+    Assert.assertEquals(new Double(this.estimator.calcEstimatedSize(workUnit)).longValue(), 1L);
+
+    //WorkUnit with Kafka watermark and previous avg produce rates
+    watermark = new KafkaStreamingExtractor.KafkaWatermark(new KafkaPartition.Builder().withTopicName(TEST_TOPIC).withId(0).build(), new LongWatermark(0L));
+    watermark.setAvgRecordSize(AVG_RECORD_SIZE);
+    watermark.setAvgProduceRates(avgProduceRates);
+    workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.PARTITION_WATERMARK, GSON.toJson(watermark));
+    Assert.assertEquals(new Double(this.estimator.calcEstimatedSize(workUnit)).longValue(), 29L);
+
+    //WorkUnit with Kafka watermark but no previous avg produce rates
+    workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.PACKING_START_TIME_MILLIS, format.parse(BINPACKING_TIME_2).getTime());
+    workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.DEFAULT_WORKUNIT_SIZE_KEY, 2.0);
+    Assert.assertEquals(new Double(this.estimator.calcEstimatedSize(workUnit)).longValue(), 4L);
+  }
+}
\ No newline at end of file
diff --git a/gobblin-runtime/build.gradle b/gobblin-runtime/build.gradle
index e6d1785..13c883e 100644
--- a/gobblin-runtime/build.gradle
+++ b/gobblin-runtime/build.gradle
@@ -38,6 +38,7 @@ compileJava {
 dependencies {
   compile project(":gobblin-api")
   compile project(":gobblin-core")
+  compile project(":gobblin-core-base")
   compile project(":gobblin-hive-registration")
   compile project(":gobblin-metrics-libs:gobblin-metrics")
   compile project(":gobblin-metastore")
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index a9c0ac4..b7c87d1 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -78,6 +78,7 @@ ext.externalDependency = [
     "jgit": "org.eclipse.jgit:org.eclipse.jgit:5.1.1.201809181055-r",
     "jmh": "org.openjdk.jmh:jmh-core:1.17.3",
     "jmhAnnotations": "org.openjdk.jmh:jmh-generator-annprocess:1.17.3",
+    "jollyday": "de.jollyday:jollyday:0.4.9",
     "kafka08": "org.apache.kafka:kafka_2.11:" + kafka08Version,
     "kafka08Test": "org.apache.kafka:kafka_2.11:" + kafka08Version + ":test",
     "kafka08Client": "org.apache.kafka:kafka-clients:" + kafka08Version,
@@ -174,13 +175,13 @@ ext.externalDependency = [
     /**
      * Avoiding conflicts with Hive 1.x versions existed in the classpath
      */
-    "orcMapreduce":"org.apache.orc:orc-mapreduce:1.6.3:nohive",
-    "orcCore": "org.apache.orc:orc-core:1.6.3:nohive",
-    "orcTools":"org.apache.orc:orc-tools:1.6.3",
-    'parquet': 'org.apache.parquet:parquet-hadoop:1.11.0',
-    'parquetAvro': 'org.apache.parquet:parquet-avro:1.11.0',
-    'parquetProto': 'org.apache.parquet:parquet-protobuf:1.11.0',
-    'parquetHadoop': 'org.apache.parquet:parquet-hadoop-bundle:1.11.0',
+    "orcMapreduce":"org.apache.orc:orc-mapreduce:1.6.5:nohive",
+    "orcCore": "org.apache.orc:orc-core:1.6.5:nohive",
+    "orcTools":"org.apache.orc:orc-tools:1.6.5",
+    'parquet': 'org.apache.parquet:parquet-hadoop:1.11.1',
+    'parquetAvro': 'org.apache.parquet:parquet-avro:1.11.1',
+    'parquetProto': 'org.apache.parquet:parquet-protobuf:1.11.1',
+    'parquetHadoop': 'org.apache.parquet:parquet-hadoop-bundle:1.11.1',
     'reactivex': 'io.reactivex.rxjava2:rxjava:2.1.0',
     "slf4j": [
         "org.slf4j:slf4j-api:" + slf4jVersion,