You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/12/04 05:20:41 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1259] Implement
a Kafka streaming extractor
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 44d7aa0 [GOBBLIN-1259] Implement a Kafka streaming extractor
44d7aa0 is described below
commit 44d7aa00b1a8ecf50974d1004f8ff18677eeffed
Author: Sudarshan Vasudevan <sv...@users.noreply.github.com>
AuthorDate: Thu Dec 3 21:20:34 2020 -0800
[GOBBLIN-1259] Implement a Kafka streaming extractor
Closes #3102 from sv2000/kafkaStreamingExtractor
---
.../runtime/CheckpointableWatermarkState.java | 2 +-
.../apache/gobblin/stream/FlushRecordEnvelope.java | 23 +
gobblin-core-base/build.gradle | 1 +
.../extractor/extract/FlushingExtractor.java | 367 ++++++++++++++
.../runtime/StateStoreBasedWatermarkStorage.java | 0
.../apache/gobblin/metrics/MetricContextUtils.java | 40 ++
.../couchbase/writer/CouchbaseWriterBuilder.java | 4 +-
gobblin-modules/gobblin-kafka-common/build.gradle | 2 +-
.../kafka/client/GobblinKafkaConsumerClient.java | 3 +
.../extractor/extract/kafka/KafkaPartition.java | 2 +-
.../extract/kafka/KafkaProduceRateTracker.java | 311 ++++++++++++
.../extract/kafka/KafkaStreamingExtractor.java | 554 +++++++++++++++++++++
.../packer/KafkaTopicGroupingWorkUnitPacker.java | 350 +++++++++++++
...roduceRateAndLagBasedWorkUnitSizeEstimator.java | 150 ++++++
.../packer/UnitKafkaWorkUnitSizeEstimator.java | 36 ++
.../gobblin/metrics/kafka/LoggingPusherTest.java | 2 +
.../extract/kafka/KafkaExtractorUtils.java | 93 ++++
.../extract/kafka/KafkaProduceRateTrackerTest.java | 220 ++++++++
.../extract/kafka/KafkaStreamTestUtils.java | 314 ++++++++++++
.../extract/kafka/KafkaStreamingExtractorTest.java | 134 +++++
.../KafkaTopicGroupingWorkUnitPackerTest.java | 154 ++++++
...ceRateAndLagBasedWorkUnitSizeEstimatorTest.java | 94 ++++
gobblin-runtime/build.gradle | 1 +
gradle/scripts/dependencyDefinitions.gradle | 15 +-
24 files changed, 2860 insertions(+), 12 deletions(-)
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CheckpointableWatermarkState.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/CheckpointableWatermarkState.java
similarity index 96%
rename from gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CheckpointableWatermarkState.java
rename to gobblin-api/src/main/java/org/apache/gobblin/runtime/CheckpointableWatermarkState.java
index 791f1b3..08dc5fa 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CheckpointableWatermarkState.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/CheckpointableWatermarkState.java
@@ -25,7 +25,7 @@ import org.apache.gobblin.source.extractor.CheckpointableWatermark;
/**
* Making {@link CheckpointableWatermark} look like {@link State} so it can be
- * stored in a {@link org.apache.gobblin.metastore.StateStore}.
+ * stored in a Gobblin state store.
*/
public class CheckpointableWatermarkState extends State {
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/stream/FlushRecordEnvelope.java b/gobblin-api/src/main/java/org/apache/gobblin/stream/FlushRecordEnvelope.java
new file mode 100644
index 0000000..bc73965
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/stream/FlushRecordEnvelope.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.stream;
+
+public class FlushRecordEnvelope extends RecordEnvelope {
+ public FlushRecordEnvelope() {
+ super(null);
+ }
+}
diff --git a/gobblin-core-base/build.gradle b/gobblin-core-base/build.gradle
index 4c9b667..fb7a4a7 100644
--- a/gobblin-core-base/build.gradle
+++ b/gobblin-core-base/build.gradle
@@ -21,6 +21,7 @@ apply plugin: 'me.champeau.gradle.jmh'
dependencies {
compile project(":gobblin-api")
+ compile project(":gobblin-metastore")
compile project(":gobblin-utility")
compile project(":gobblin-metrics-libs:gobblin-metrics")
compile project(":gobblin-modules:gobblin-codecs")
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/FlushingExtractor.java b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/FlushingExtractor.java
new file mode 100644
index 0000000..9391a30
--- /dev/null
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/FlushingExtractor.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.ack.Ackable;
+import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metrics.MetricContextUtils;
+import org.apache.gobblin.publisher.DataPublisher;
+import org.apache.gobblin.runtime.StateStoreBasedWatermarkStorage;
+import org.apache.gobblin.source.extractor.CheckpointableWatermark;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.stream.FlushControlMessage;
+import org.apache.gobblin.stream.FlushRecordEnvelope;
+import org.apache.gobblin.stream.RecordEnvelope;
+import org.apache.gobblin.stream.StreamEntity;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.gobblin.writer.LastWatermarkTracker;
+import org.apache.gobblin.writer.WatermarkStorage;
+import org.apache.gobblin.writer.WatermarkTracker;
+
+
+/**
+ * An abstract class that implements a {@link EventBasedExtractor}. The {@link FlushingExtractor} injects a
+ * {@link FlushControlMessage} at a frequency determined by {@value #FLUSH_INTERVAL_SECONDS_KEY}.
+ * The {@link FlushControlMessage} blocks further messages being pushed until the outstanding {@link FlushControlMessage}
+ * has been acked by the real writer. On a successful ack, the extractor invokes a publish on the underlying DataPublisher,
+ * which moves the data from the task output location to the final publish location. After a successful publish, the FlushingExtractor
+ * commits the watermarks to the {@link WatermarkStorage}. Note the watermark committed to the watermark storage is the
+ * last successfully acked Watermark. Individual extractor implementations should start seeking from the next watermark
+ * past this watermark. For example, in the case of a Kafka extractor, the consumer should seek to the offset that is
+ * one more than the last committed watermark.
+ *
+ * Individual extractor implementations that extend the FlushingExtractor need to implement the following method:
+ * <ul>
+ * <li>readRecordEnvelopeImpl() - to return the next {@link RecordEnvelope}</li>.
+ * </ul>
+ *
+ * The FlushingExtractor allows applications to plug-in pre and post {@link CommitStep}s as actions to be performed before and after
+ * each commit.
+ * @param <D> type of {@link RecordEnvelope}
+ */
+
+@Slf4j
+public abstract class FlushingExtractor<S, D> extends EventBasedExtractor<S, D> {
+ public static final String GOBBLIN_EXTRACTOR_PRECOMMIT_STEPS = "gobblin.extractor.precommit.steps";
+ public static final String GOBBLIN_EXTRACTOR_POSTCOMMIT_STEPS = "gobblin.extractor.postcommit.steps";
+ public static final String FLUSH_INTERVAL_SECONDS_KEY = "stream.flush.interval.secs";
+ public static final Long DEFAULT_FLUSH_INTERVAL_SECONDS = 60L;
+
+ public static final String FLUSH_DATA_PUBLISHER_CLASS = "flush.data.publisher.class";
+ public static final String DEFAULT_FLUSH_DATA_PUBLISHER_CLASS = "org.apache.gobblin.publisher.BaseDataPublisher";
+
+ public static final String WATERMARK_COMMIT_TIME_METRIC = "state.store.metrics.watermarkCommitTime";
+ public static final String COMMIT_STEP_METRIC_PREFIX = "commit.step.";
+
+ @Getter
+ protected Map<String, CheckpointableWatermark> lastCommittedWatermarks;
+
+ private final List<String> preCommitSteps;
+ private final List<String> postCommitSteps;
+
+ private final Map<String, CommitStep> commitStepMap = Maps.newHashMap();
+
+ private final AtomicLong watermarkCommitTime = new AtomicLong(0L);
+ private final List<AtomicLong> preCommitStepTimes = Lists.newArrayList();
+ private final List<AtomicLong> postCommitStepTimes = Lists.newArrayList();
+
+ protected Config config;
+ @Setter
+ private Optional<WatermarkStorage> watermarkStorage;
+ @Getter
+ protected WatermarkTracker watermarkTracker;
+ protected Long flushIntervalMillis;
+
+ protected Long timeOfLastFlush = System.currentTimeMillis();
+ private FlushAckable lastFlushAckable;
+ private boolean hasOutstandingFlush = false;
+ private Optional<DataPublisher> flushPublisher = Optional.absent();
+ protected WorkUnitState workUnitState;
+
+ public FlushingExtractor(WorkUnitState state) {
+ super(state);
+ this.workUnitState = state;
+ this.config = ConfigFactory.parseProperties(state.getProperties());
+ this.flushIntervalMillis =
+ ConfigUtils.getLong(config, FLUSH_INTERVAL_SECONDS_KEY, DEFAULT_FLUSH_INTERVAL_SECONDS) * 1000;
+ this.watermarkTracker = new LastWatermarkTracker(false);
+ this.watermarkStorage = Optional.of(new StateStoreBasedWatermarkStorage(state));
+ this.preCommitSteps = ConfigUtils.getStringList(config, GOBBLIN_EXTRACTOR_PRECOMMIT_STEPS);
+ this.postCommitSteps = ConfigUtils.getStringList(config, GOBBLIN_EXTRACTOR_POSTCOMMIT_STEPS);
+ preCommitSteps.stream().map(commitStep -> new AtomicLong(0L)).forEach(this.preCommitStepTimes::add);
+ postCommitSteps.stream().map(commitStep -> new AtomicLong(0L)).forEach(this.postCommitStepTimes::add);
+
+ initFlushPublisher();
+ MetricContextUtils.registerGauge(this.getMetricContext(), WATERMARK_COMMIT_TIME_METRIC, this.watermarkCommitTime);
+ initCommitStepMetrics(this.preCommitSteps, this.postCommitSteps);
+ }
+
+ private void initCommitStepMetrics(List<String>... commitStepLists) {
+ for (List<String> commitSteps : commitStepLists) {
+ for (String commitStepAlias : commitSteps) {
+ String metricName = COMMIT_STEP_METRIC_PREFIX + commitStepAlias + ".time";
+ MetricContextUtils.registerGauge(this.getMetricContext(), metricName, new AtomicLong(0L));
+ }
+ }
+ }
+
+ private StreamEntity<D> generateFlushMessageIfNecessary() {
+ Long currentTime = System.currentTimeMillis();
+ if ((currentTime - timeOfLastFlush) > this.flushIntervalMillis) {
+ return generateFlushMessage(currentTime);
+ }
+ return null;
+ }
+
+ private StreamEntity<D> generateFlushMessage(Long currentTime) {
+ log.debug("Injecting flush control message");
+ FlushControlMessage<D> flushMessage = FlushControlMessage.<D>builder().flushReason("Timed flush").build();
+ FlushAckable flushAckable = new FlushAckable();
+ // add a flush ackable to wait for the flush to complete before returning from this flush call
+ flushMessage.addCallBack(flushAckable);
+
+ //Preserve the latest flushAckable.
+ this.lastFlushAckable = flushAckable;
+ this.hasOutstandingFlush = true;
+ timeOfLastFlush = currentTime;
+ return flushMessage;
+ }
+
+ /**
+ * Create an {@link DataPublisher} for publishing after a flush. The {@link DataPublisher} is created through a
+ * DataPublisherFactory which makes requests
+ * to a {@link org.apache.gobblin.broker.iface.SharedResourcesBroker} to support sharing
+ * {@link DataPublisher} instances when appropriate.
+ * @return the {@link DataPublisher}
+ */
+ private void initFlushPublisher() {
+ if (this.flushPublisher.isPresent()) {
+ return;
+ }
+ String publisherClassName =
+ ConfigUtils.getString(this.config, FLUSH_DATA_PUBLISHER_CLASS, DEFAULT_FLUSH_DATA_PUBLISHER_CLASS);
+
+ try {
+ this.flushPublisher = (Optional<DataPublisher>) Optional.of(
+ GobblinConstructorUtils.invokeLongestConstructor(Class.forName(publisherClassName), this.workUnitState));
+ } catch (ReflectiveOperationException e) {
+ log.error("Error in instantiating Data Publisher");
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public StreamEntity<D> readStreamEntityImpl() throws DataRecordException, IOException {
+ //Block until an outstanding flush has been Ack-ed.
+ if (this.hasOutstandingFlush) {
+ Throwable error = this.lastFlushAckable.waitForAck();
+ if (error != null) {
+ throw new RuntimeException("Error waiting for flush ack", error);
+ }
+
+ //Reset outstandingFlush flag
+ this.hasOutstandingFlush = false;
+
+ //Run pre-commit steps
+ doCommitSequence(preCommitSteps, true);
+
+ //Publish task output to final publish location.
+ publishTaskOutput();
+
+ //Provide a callback to the underlying extractor to handle logic for flush ack.
+ onFlushAck();
+
+ //Run post-commit steps
+ doCommitSequence(postCommitSteps, false);
+ }
+
+ StreamEntity<D> entity = generateFlushMessageIfNecessary();
+ if (entity != null) {
+ return entity;
+ }
+
+ //return the next read record.
+ RecordEnvelope<D> recordEnvelope = readRecordEnvelopeImpl();
+ if (recordEnvelope instanceof FlushRecordEnvelope) {
+ StreamEntity<D> flushMessage = generateFlushMessage(System.currentTimeMillis());
+ return flushMessage;
+ }
+ if (recordEnvelope != null) {
+ this.watermarkTracker.unacknowledgedWatermark(recordEnvelope.getWatermark());
+ }
+
+ return recordEnvelope;
+ }
+
+ /**
+ * A method that instantiates a {@link CommitStep} given an alias.
+ * @param commitStepAlias alias or fully qualified class name of the {@link CommitStep}.
+ * @throws IOException
+ */
+ public CommitStep initCommitStep(String commitStepAlias, boolean isPrecommit) throws IOException {
+ return null;
+ }
+
+ private void doCommitSequence(List<String> commitSteps, boolean isPrecommit) throws IOException {
+ for (int i = 0; i < commitSteps.size(); i++) {
+ long startTimeMillis = System.currentTimeMillis();
+ String commitStepAlias = commitSteps.get(i);
+ CommitStep commitStep = commitStepMap.get(commitStepAlias);
+ if (commitStep == null) {
+ commitStep = initCommitStep(commitSteps.get(i), isPrecommit);
+ commitStepMap.put(commitStepAlias, commitStep);
+ }
+ log.info("Calling commit step: {}", commitStepAlias);
+ commitStep.execute();
+ long commitStepTime = System.currentTimeMillis() - startTimeMillis;
+ if (isPrecommit) {
+ preCommitStepTimes.get(i).set(commitStepTime);
+ } else {
+ postCommitStepTimes.get(i).set(commitStepTime);
+ }
+ }
+ }
+
+ /**
+ * A callback for the underlying extractor to implement logic for handling the completion of a flush. Underlying
+ * Extractor can override this method
+ */
+ protected void onFlushAck() throws IOException {
+ checkPointWatermarks();
+ }
+
+ /**
+ * A method that returns the latest committed watermarks back to the caller. This method will be typically called
+ * by the underlying extractor during the initialization phase to retrieve the latest watermarks.
+ * @param checkPointableWatermarkClass a {@link CheckpointableWatermark} class
+ * @param partitions a collection of partitions assigned to the extractor
+ * @return the latest committed watermarks as a map of (source, watermark) pairs. For example, in the case of a KafkaStreamingExtractor,
+ * this map would be a collection of (TopicPartition, KafkaOffset) pairs.
+ */
+ public Map<String, CheckpointableWatermark> getCommittedWatermarks(Class checkPointableWatermarkClass,
+ Iterable<String> partitions) {
+ Preconditions.checkArgument(CheckpointableWatermark.class.isAssignableFrom(checkPointableWatermarkClass),
+ "Watermark class " + checkPointableWatermarkClass.toString() + " is not a CheckPointableWatermark class");
+ try {
+ this.lastCommittedWatermarks =
+ this.watermarkStorage.get().getCommittedWatermarks(checkPointableWatermarkClass, partitions);
+ } catch (Exception e) {
+ // failed to get watermarks ... log a warning message
+ log.warn("Failed to get watermarks... will start from the beginning", e);
+ this.lastCommittedWatermarks = Collections.EMPTY_MAP;
+ }
+ return this.lastCommittedWatermarks;
+ }
+
+ /**
+ * Publish task output to final publish location.
+ */
+ protected void publishTaskOutput() throws IOException {
+ if (!this.flushPublisher.isPresent()) {
+ throw new IOException("Publish called without a flush publisher");
+ }
+ this.flushPublisher.get().publish(Collections.singletonList(workUnitState));
+ }
+
+ /**
+ * Persist the watermarks in {@link WatermarkTracker#unacknowledgedWatermarks(Map)} to {@link WatermarkStorage}.
+ * The method is called when after a {@link FlushControlMessage} has been acknowledged. To make retrieval of
+ * the last committed watermarks efficient, this method caches the watermarks present in the unacknowledged watermark
+ * map.
+ *
+ * @throws IOException
+ */
+ private void checkPointWatermarks() throws IOException {
+ Map<String, CheckpointableWatermark> unacknowledgedWatermarks =
+ this.watermarkTracker.getAllUnacknowledgedWatermarks();
+ if (this.watermarkStorage.isPresent()) {
+ long commitBeginTime = System.currentTimeMillis();
+ this.watermarkStorage.get().commitWatermarks(unacknowledgedWatermarks.values());
+ this.watermarkCommitTime.set(System.currentTimeMillis() - commitBeginTime);
+ //Cache the last committed watermarks
+ for (Map.Entry<String, CheckpointableWatermark> entry : unacknowledgedWatermarks.entrySet()) {
+ this.lastCommittedWatermarks.put(entry.getKey(), entry.getValue());
+ }
+ } else {
+ log.warn("No watermarkStorage found; Skipping checkpointing");
+ }
+ }
+
+ /**
+ * A method to be implemented by the underlying extractor that returns the next record as an instance of
+ * {@link RecordEnvelope}
+ * @return the next {@link RecordEnvelope} instance read from the source
+ */
+ public abstract RecordEnvelope<D> readRecordEnvelopeImpl() throws DataRecordException, IOException;
+
+ /**
+ * {@link Ackable} for waiting for the flush control message to be processed
+ */
+ private static class FlushAckable implements Ackable {
+ private Throwable error;
+ private final CountDownLatch processed;
+
+ public FlushAckable() {
+ this.processed = new CountDownLatch(1);
+ }
+
+ @Override
+ public void ack() {
+ this.processed.countDown();
+ }
+
+ @Override
+ public void nack(Throwable error) {
+ this.error = error;
+ this.processed.countDown();
+ }
+
+ /**
+ * Wait for ack
+ * @return any error encountered
+ */
+ public Throwable waitForAck() {
+ try {
+ this.processed.await();
+ return this.error;
+ } catch (InterruptedException e) {
+ throw new RuntimeException("interrupted while waiting for ack");
+ }
+ }
+ }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java b/gobblin-metastore/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
similarity index 100%
rename from gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
rename to gobblin-metastore/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContextUtils.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContextUtils.java
new file mode 100644
index 0000000..41c2bf9
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContextUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics;
+
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.codahale.metrics.Gauge;
+import com.google.common.util.concurrent.AtomicDouble;
+
+
+public class MetricContextUtils {
+
+ private MetricContextUtils() {
+ }
+ public static void registerGauge(MetricContext metricContext, String metricName, AtomicLong atomicLong) {
+ Gauge<Long> gauge = metricContext.newContextAwareGauge(metricName, () -> atomicLong.get());
+ metricContext.register(metricName, gauge);
+ }
+
+ public static void registerGauge(MetricContext metricContext, String metricName, AtomicDouble atomicDouble) {
+ Gauge<Double> gauge = metricContext.newContextAwareGauge(metricName, () -> atomicDouble.get());
+ metricContext.register(metricName, gauge);
+ }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseWriterBuilder.java b/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseWriterBuilder.java
index de155da..d961ddb 100644
--- a/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseWriterBuilder.java
+++ b/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseWriterBuilder.java
@@ -18,10 +18,10 @@
package org.apache.gobblin.couchbase.writer;
import com.couchbase.client.java.env.CouchbaseEnvironment;
+import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
import java.io.IOException;
import java.util.Properties;
-import junit.framework.Assert;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.AsyncDataWriter;
@@ -34,7 +34,7 @@ import org.apache.log4j.Logger;
public class CouchbaseWriterBuilder extends DataWriterBuilder {
private static final Logger LOG = Logger.getLogger(CouchbaseWriterBuilder.class);
public DataWriter build(Config config) throws IOException {
- Assert.assertNotNull("Config cannot be null", config);
+ Preconditions.checkArgument(config != null, "Config cannot be null");
config.entrySet().stream().forEach(x -> String.format("Config passed to factory builder '%s':'%s'", x.getKey(), x.getValue().toString()));
CouchbaseEnvironment couchbaseEnvironment = CouchbaseEnvironmentFactory.getInstance(config);
diff --git a/gobblin-modules/gobblin-kafka-common/build.gradle b/gobblin-modules/gobblin-kafka-common/build.gradle
index d92774b..8cd83a4 100644
--- a/gobblin-modules/gobblin-kafka-common/build.gradle
+++ b/gobblin-modules/gobblin-kafka-common/build.gradle
@@ -40,7 +40,7 @@ dependencies {
compile externalDependency.slf4j
compile externalDependency.typesafeConfig
compile externalDependency.findBugsAnnotations
-
+ compile externalDependency.jollyday
testCompile project(":gobblin-test-utils")
testCompile externalDependency.mockito
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
index 684a48c..c75408e 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
@@ -28,6 +28,7 @@ import com.codahale.metrics.Metric;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
@@ -166,6 +167,8 @@ public interface GobblinKafkaConsumerClient extends Closeable {
return -1L;
}
+ public default void assignAndSeek(List<KafkaPartition> topicPartitions, Map<KafkaPartition, LongWatermark> topicWatermarksMap) { return; }
+
/**
* A factory to create {@link GobblinKafkaConsumerClient}s
*/
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaPartition.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaPartition.java
index 29cea5b..df00618 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaPartition.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaPartition.java
@@ -99,7 +99,7 @@ public final class KafkaPartition {
@Override
public String toString() {
- return this.getTopicName() + ":" + this.getId();
+ return this.getTopicName() + "-" + this.getId();
}
@Override
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTracker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTracker.java
new file mode 100644
index 0000000..b3e67e6
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTracker.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.joda.time.DateTimeZone;
+import org.joda.time.LocalDate;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.EvictingQueue;
+import com.google.common.collect.Maps;
+
+import de.jollyday.HolidayManager;
+import de.jollyday.parameter.UrlManagerParameter;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.extractor.CheckpointableWatermark;
+import org.apache.gobblin.source.extractor.extract.FlushingExtractor;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
+import org.apache.gobblin.writer.WatermarkTracker;
+
+/**
+ * A helper class that tracks the produce rate for each TopicPartition currently consumed by the {@link KafkaStreamingExtractor}.
+ * The produce rates are stored in the {@link org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor.KafkaWatermark}
+ * and checkpointed to the {@link org.apache.gobblin.writer.WatermarkStorage} along with the watermarks.
+ *
+ * The produce rates are maintained based on hour-of-day and day-of-week, and are computed in bytes/sec. The new produce
+ * rates estimates are obtained as Exponentially Weighted Moving Average (EWMA) as:
+ * new_produce_rate = a * avg_rate_in_current_window + (1 - a) * historic_produce_rate, where:
+ * "a" is the exponential decay factor. Small values of "a" result in estimates updated more slowly, while large values of
+ * "a" will result in giving more weight to recent values.
+ */
+public class KafkaProduceRateTracker {
+ private static final String KAFKA_PRODUCE_RATE_EXPONENTIAL_DECAY_FACTOR_KEY =
+ "gobblin.kafka.produceRateTracker.exponentialDecayFactor";
+ private static final Double DEFAULT_KAFKA_PRODUCE_RATE_EXPONENTIAL_DECAY_FACTOR = 0.375;
+ static final String KAFKA_PRODUCE_RATE_DISABLE_STATS_ON_HOLIDAYS_KEY = "gobblin.kafka.produceRateTracker.disableStatsOnHolidays";
+ private static final Boolean DEFAULT_KAFKA_PRODUCE_RATE_DISABLE_STATS_ON_HOLIDAYS = false;
+ private static final String KAFKA_PRODUCE_RATE_HOLIDAY_LOCALE_KEY = "gobblin.kafka.produceRateTracker.holidayLocale";
+ private static final String DEFAULT_KAFKA_PRODUCE_RATE_HOLIDAY_LOCALE = "ca";
+ private static final String HOLIDAY_FILE = "Holidays.xml";
+ private static final DateTimeZone DEFAULT_TIME_ZONE = DateTimeZone.getDefault();
+ static final int SLIDING_WINDOW_SIZE = 3;
+
+ static final String KAFKA_PARTITION_PRODUCE_RATE_KEY = "produceRate";
+
+ /**
+ * The element-insertion order has to be maintained since:
+ * 1. API provided by {@link KafkaExtractorStatsTracker} accepts index of partition, instead of partition
+ * object like {@link #partitionsToProdRate}.
+ * 2. When traversing {@link #partitionsToProdRate} to update new produce-rate value, we need find the previous value
+ * of watermark to help calculate produce-rate, which is also indexed by partition-index.
+ * 3. To make sure the entry mapping between entry in the {@link #partitionsToProdRate} and entry in {@link KafkaExtractorStatsTracker}
+ * and watermark, the order of entries in this map which is defined by input list of {@link KafkaPartition} needs
+ * to be preserved, as this input list also serves source of constructing entry-order in {@link KafkaExtractorStatsTracker}.
+ */
+ @Getter
+ private final LinkedHashMap<KafkaPartition, Double> partitionsToProdRate;
+ private final WatermarkTracker watermarkTracker;
+ private final KafkaExtractorStatsTracker statsTracker;
+ private final Double exponentialDecayFactor;
+ private final Boolean disableStatsOnHolidays;
+ private final HolidayManager holidayManager;
+ private final long flushIntervalSecs;
+ private final String holidayLocale;
+ private Long lastReportTimeMillis;
+ private final Map<LocalDate, Boolean> holidayMap = Maps.newHashMap();
+
+ private final EvictingQueue<Long> ingestionLatencies = EvictingQueue.create(SLIDING_WINDOW_SIZE);
+ private final EvictingQueue<Double> consumptionRateMBps = EvictingQueue.create(SLIDING_WINDOW_SIZE);
+
+ public KafkaProduceRateTracker(WorkUnitState state, List<KafkaPartition> partitions, WatermarkTracker watermarkTracker,
+ KafkaExtractorStatsTracker statsTracker) {
+ this(state, partitions, watermarkTracker, statsTracker, System.currentTimeMillis());
+ }
+
+ @VisibleForTesting
+ KafkaProduceRateTracker(WorkUnitState state, List<KafkaPartition> partitions, WatermarkTracker watermarkTracker,
+ KafkaExtractorStatsTracker statsTracker, Long lastReportTimeMillis) {
+ this.partitionsToProdRate = (LinkedHashMap<KafkaPartition, Double>) partitions.stream()
+ .collect(Collectors.toMap(Function.identity(), x -> new Double(-1), (e1, e2) -> e1, LinkedHashMap::new));
+ this.watermarkTracker = watermarkTracker;
+ this.statsTracker = statsTracker;
+ this.lastReportTimeMillis = lastReportTimeMillis;
+ this.exponentialDecayFactor = state.getPropAsDouble(KAFKA_PRODUCE_RATE_EXPONENTIAL_DECAY_FACTOR_KEY, DEFAULT_KAFKA_PRODUCE_RATE_EXPONENTIAL_DECAY_FACTOR);
+
+ URL calendarFileUrl = getClass().getClassLoader().getResource(HOLIDAY_FILE);
+ this.holidayManager =
+ calendarFileUrl != null ? HolidayManager.getInstance(new UrlManagerParameter(calendarFileUrl, new Properties()))
+ : HolidayManager.getInstance();
+ this.disableStatsOnHolidays = state.getPropAsBoolean(KAFKA_PRODUCE_RATE_DISABLE_STATS_ON_HOLIDAYS_KEY,
+ DEFAULT_KAFKA_PRODUCE_RATE_DISABLE_STATS_ON_HOLIDAYS);
+ this.holidayLocale = state.getProp(KAFKA_PRODUCE_RATE_HOLIDAY_LOCALE_KEY, DEFAULT_KAFKA_PRODUCE_RATE_HOLIDAY_LOCALE);
+ this.flushIntervalSecs = state.getPropAsLong(FlushingExtractor.FLUSH_INTERVAL_SECONDS_KEY, FlushingExtractor.DEFAULT_FLUSH_INTERVAL_SECONDS);
+ }
+
+ public static int getHourOfDay(Date date) {
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(date);
+ //HOUR_OF_DAY ranges from 0-23.
+ return calendar.get(Calendar.HOUR_OF_DAY);
+ }
+
+ public static int getDayOfWeek(Date date) {
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(date);
+ //DAY_OF_WEEK ranges from 1-7.
+ return calendar.get(Calendar.DAY_OF_WEEK) - 1;
+ }
+
+ @Getter
+ @AllArgsConstructor
+ public static class TopicPartitionStats {
+ private double[][] avgProduceRates;
+ private double avgConsumeRate;
+ private long avgRecordSize;
+ // Cache the recent caught produce-rate of a partition.
+ private double currentProduceRate;
+ }
+
+ private TopicPartitionStats getNewTopicPartitionStats(Long previousMaxOffset, Long maxOffset, KafkaStreamingExtractor.KafkaWatermark lastCommittedWatermark,
+ long currentTimeMillis, Long avgRecordSize) {
+ if (previousMaxOffset == 0) {
+ TopicPartitionStats stats = new TopicPartitionStats(lastCommittedWatermark.getAvgProduceRates(), lastCommittedWatermark.getAvgConsumeRate(),
+ lastCommittedWatermark.getAvgRecordSize(), 0);
+ return stats;
+ }
+ long numRecordsProduced = maxOffset - previousMaxOffset;
+ long newAvgRecordSize;
+ if (numRecordsProduced > 0) {
+ if (lastCommittedWatermark.getAvgRecordSize() > 0) {
+ newAvgRecordSize = updateMovingAverage(avgRecordSize, lastCommittedWatermark.getAvgRecordSize()).longValue();
+ } else {
+ //No previously recorded average record size.
+ newAvgRecordSize = avgRecordSize;
+ }
+ } else {
+ //No records see in the current window. No need to update the average record size.
+ newAvgRecordSize = lastCommittedWatermark.getAvgRecordSize();
+ }
+ Date date = new Date(currentTimeMillis);
+ int hourOfDay = getHourOfDay(date);
+ int dayOfWeek = getDayOfWeek(date);
+ double currentProduceRate =
+ (numRecordsProduced * avgRecordSize) * 1000 / (double) (currentTimeMillis - lastReportTimeMillis + 1);
+ double[][] historicProduceRates = lastCommittedWatermark.getAvgProduceRates();
+ if (!isHoliday(new LocalDate(currentTimeMillis, DEFAULT_TIME_ZONE))) {
+ if (historicProduceRates != null) {
+ if (historicProduceRates[dayOfWeek][hourOfDay] >= 0) {
+ historicProduceRates[dayOfWeek][hourOfDay] =
+ updateMovingAverage(currentProduceRate, historicProduceRates[dayOfWeek][hourOfDay]);
+ } else {
+ historicProduceRates[dayOfWeek][hourOfDay] = currentProduceRate;
+ }
+ } else {
+ //No previous values found. Bootstrap with the average rate computed in the current window.
+ historicProduceRates = new double[7][24];
+ for (double[] row : historicProduceRates) {
+ Arrays.fill(row, -1.0);
+ }
+ historicProduceRates[dayOfWeek][hourOfDay] = currentProduceRate;
+ }
+ }
+
+ double consumeRate = lastCommittedWatermark.getAvgConsumeRate();
+ ingestionLatencies.add(this.statsTracker.getMaxIngestionLatency(TimeUnit.SECONDS));
+ consumptionRateMBps.add(this.statsTracker.getConsumptionRateMBps());
+
+ if (isConsumerBacklogged()) {
+ //If ingestion latency is high, it means the consumer is backlogged. Hence, its current consumption rate
+ //must equal the peak consumption rate.
+ consumeRate = consumeRate >= 0 ? updateMovingAverage(getPenultimateElement(consumptionRateMBps), consumeRate)
+ : this.statsTracker.getConsumptionRateMBps();
+ }
+ return new TopicPartitionStats(historicProduceRates, consumeRate, newAvgRecordSize, currentProduceRate);
+ }
+
+ private boolean isConsumerBacklogged() {
+ if (this.ingestionLatencies.size() < SLIDING_WINDOW_SIZE) {
+ return false;
+ }
+ for (long latency: this.ingestionLatencies) {
+ if (latency < (2 * this.flushIntervalSecs)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Returns the element before the final element. It does this by removing the oldest element and peeking at the
+ * next element.
+ * @param queue
+ * @return
+ */
+ static Double getPenultimateElement(EvictingQueue<Double> queue) {
+ Preconditions.checkArgument(queue.size() > 1);
+ queue.remove();
+ return queue.peek();
+ }
+
+ /**
+ * A method that computes a new moving average from previous average estimate and current value using an
+ * Exponentially weighted moving average (EWMA) algorithm.
+ * @param currentValue
+ * @param previousAverage
+ * @return updated moving average computed as an EWMA.
+ */
+ private Double updateMovingAverage(double currentValue, double previousAverage) {
+ return exponentialDecayFactor * currentValue + (1 - exponentialDecayFactor) * previousAverage;
+ }
+
+ /**
+ * Several side effects in this method:
+ * 1. Write ProduceRate of each KafkaPartition into its watermark as the method name indicates.
+ * 2. Update {@link #partitionsToProdRate} for each KafkaPartitions with their newest ProduceRate, this would be
+ * part of GTE to be emitted as each flush happens.
+ */
+ public void writeProduceRateToKafkaWatermarks(Map<KafkaPartition, Long> latestOffsetMap, Map<String, CheckpointableWatermark> lastCommittedWatermarks,
+ MultiLongWatermark highWatermark, long currentTimeMillis) {
+ int partitionIndex = 0;
+ Map<String, CheckpointableWatermark> unacknowledgedWatermarks = watermarkTracker.getAllUnacknowledgedWatermarks();
+ for (KafkaPartition partition : this.partitionsToProdRate.keySet()) {
+ long maxOffset = latestOffsetMap.getOrDefault(partition, -1L);
+
+ KafkaStreamingExtractor.KafkaWatermark kafkaWatermark =
+ (KafkaStreamingExtractor.KafkaWatermark) lastCommittedWatermarks.get(partition.toString());
+ KafkaStreamingExtractor.KafkaWatermark unacknowledgedWatermark =
+ (KafkaStreamingExtractor.KafkaWatermark) unacknowledgedWatermarks.get(partition.toString());
+ if (unacknowledgedWatermark == null && kafkaWatermark == null) {
+ //If there is no previously committed watermark for the topic partition and no records in current time window, no further
+ //processing needed for the topic partition.
+ continue;
+ }
+ if (kafkaWatermark == null) {
+ //If there is no previously committed watermark for the topic partition, create a dummy watermark for computing stats
+ kafkaWatermark = new KafkaStreamingExtractor.KafkaWatermark(partition, new LongWatermark(0L));
+ }
+ long avgRecordSize = this.statsTracker.getAvgRecordSize(partitionIndex);
+ long previousMaxOffset = highWatermark.get(partitionIndex++);
+ //If maxOffset < 0, it means that we could not get max offsets from Kafka due to metadata fetch failure.
+ // In this case, carry previous state forward and set produce-rate to negative value, indicating it's not available.
+ TopicPartitionStats stats =
+ maxOffset >= 0 ? getNewTopicPartitionStats(previousMaxOffset, maxOffset, kafkaWatermark, currentTimeMillis,
+ avgRecordSize)
+ : new TopicPartitionStats(kafkaWatermark.getAvgProduceRates(), kafkaWatermark.getAvgConsumeRate(), kafkaWatermark.getAvgRecordSize(), -1);
+
+ if (unacknowledgedWatermark == null) {
+ //If no record seen for this topicPartition in the current time window; carry forward the previously committed
+ // watermark with the updated statistics
+ unacknowledgedWatermark = kafkaWatermark;
+ watermarkTracker.unacknowledgedWatermark(unacknowledgedWatermark);
+ }
+ unacknowledgedWatermark.setAvgProduceRates(stats.getAvgProduceRates());
+ unacknowledgedWatermark.setAvgConsumeRate(stats.getAvgConsumeRate());
+ unacknowledgedWatermark.setAvgRecordSize(stats.getAvgRecordSize());
+ partitionsToProdRate.put(partition, stats.getCurrentProduceRate());
+ }
+ this.lastReportTimeMillis = currentTimeMillis;
+ }
+
+ /**
+ * @param date
+ * @return true if:
+ * <ul>
+ * <li>Stats collection on holidays is enabled</li>, or
+ * <li>{@param date} is a holiday for the given locale</li>
+ * </ul>
+ */
+ boolean isHoliday(LocalDate date) {
+ if (!this.disableStatsOnHolidays) {
+ return false;
+ }
+ if (holidayMap.containsKey(date)) {
+ return holidayMap.get(date);
+ } else {
+ boolean isHolidayToday = this.holidayManager.isHoliday(date, this.holidayLocale);
+ holidayMap.put(date, isHolidayToday);
+ return isHolidayToday;
+ }
+ }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
new file mode 100644
index 0000000..8412592
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.avro.Schema;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.AtomicDouble;
+import com.google.gson.JsonElement;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
+import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import org.apache.gobblin.source.extractor.CheckpointableWatermark;
+import org.apache.gobblin.source.extractor.ComparableWatermark;
+import org.apache.gobblin.source.extractor.Watermark;
+import org.apache.gobblin.source.extractor.WatermarkSerializerHelper;
+import org.apache.gobblin.source.extractor.extract.FlushingExtractor;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.stream.FlushRecordEnvelope;
+import org.apache.gobblin.stream.RecordEnvelope;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ClustersNames;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+import static org.apache.gobblin.source.extractor.extract.kafka.KafkaProduceRateTracker.KAFKA_PARTITION_PRODUCE_RATE_KEY;
+import static org.apache.gobblin.source.extractor.extract.kafka.KafkaSource.DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS;
+import static org.apache.gobblin.source.extractor.extract.kafka.KafkaSource.GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS;
+import static org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaTopicGroupingWorkUnitPacker.NUM_PARTITIONS_ASSIGNED;
+
+/**
+ * An implementation of {@link org.apache.gobblin.source.extractor.Extractor} which reads from Kafka and returns records .
+ * Type of record depends on deserializer set.
+ */
+@Slf4j
+public class KafkaStreamingExtractor<S> extends FlushingExtractor<S, DecodeableKafkaRecord> {
+ public static final String DATASET_KEY = "dataset";
+ public static final String DATASET_PARTITION_KEY = "datasetPartition";
+ private static final Long MAX_LOG_DECODING_ERRORS = 5L;
+ private static final String KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES_KEY =
+ "gobblin.kafka.extractor.statsReportingIntervalMinutes";
+ private static final Long DEFAULT_KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES = 1L;
+
+ private final ClassAliasResolver<GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory>
+ kafkaConsumerClientResolver;
+ private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+ private final Map<String, AtomicDouble> consumerMetricsGauges = new ConcurrentHashMap<>();
+ private final KafkaExtractorStatsTracker statsTracker;
+ private final KafkaProduceRateTracker produceRateTracker;
+ private final List<KafkaPartition> partitions;
+ private final long extractorStatsReportingTimeIntervalMillis;
+ //Mapping from Kafka Partition Id to partition index
+ @Getter
+ private final Map<Integer, Integer> partitionIdToIndexMap;
+ private final String recordCreationTimestampFieldName;
+ private final TimeUnit recordCreationTimestampUnit;
+
+ private Iterator<KafkaConsumerRecord> messageIterator = null;
+ private long readStartTime;
+ private long lastExtractorStatsReportingTime;
+ private Map<KafkaPartition, Long> latestOffsetMap = Maps.newHashMap();
+
+ protected MultiLongWatermark lowWatermark;
+ protected MultiLongWatermark highWatermark;
+ protected MultiLongWatermark nextWatermark;
+ protected Map<Integer, DecodeableKafkaRecord> perPartitionLastSuccessfulRecord;
+ private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
+
+ @Override
+ public void shutdown() {
+ this.shutdownRequested.set(true);
+ }
+
+ @ToString
+ public static class KafkaWatermark implements CheckpointableWatermark {
+ @Getter
+ KafkaPartition topicPartition;
+ LongWatermark _lwm;
+ //Average TopicPartition Produce Rate by hour-of-day and day-of-week in records/sec.
+ @Getter
+ @Setter
+ double[][] avgProduceRates;
+ //Average consume rate for the topic when backlogged.
+ @Getter
+ @Setter
+ double avgConsumeRate = -1.0;
+ @Getter
+ @Setter
+ long avgRecordSize;
+
+ @VisibleForTesting
+ public KafkaWatermark(KafkaPartition topicPartition, LongWatermark lwm) {
+ this.topicPartition = topicPartition;
+ _lwm = lwm;
+ }
+
+ @Override
+ public String getSource() {
+ return topicPartition.toString();
+ }
+
+ @Override
+ public ComparableWatermark getWatermark() {
+ return _lwm;
+ }
+
+ @Override
+ public short calculatePercentCompletion(Watermark lowWatermark, Watermark highWatermark) {
+ return 0;
+ }
+
+ @Override
+ public JsonElement toJson() {
+ return WatermarkSerializerHelper.convertWatermarkToJson(this);
+ }
+
+ @Override
+ public int compareTo(CheckpointableWatermark o) {
+ Preconditions.checkArgument(o instanceof KafkaWatermark);
+ KafkaWatermark ko = (KafkaWatermark) o;
+ Preconditions.checkArgument(topicPartition.equals(ko.topicPartition));
+ return _lwm.compareTo(ko._lwm);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof KafkaWatermark)) {
+ return false;
+ }
+ return this.compareTo((CheckpointableWatermark) obj) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ return topicPartition.hashCode() * prime + _lwm.hashCode();
+ }
+
+ public LongWatermark getLwm() {
+ return _lwm;
+ }
+ }
+
+ AtomicLong _rowCount = new AtomicLong(0);
+ protected final Optional<KafkaSchemaRegistry<String, S>> _schemaRegistry;
+ protected final GobblinKafkaConsumerClient kafkaConsumerClient;
+
+ private final List<KafkaPartition> topicPartitions; // list of topic partitions assigned to this extractor
+
+ public KafkaStreamingExtractor(WorkUnitState state) {
+ super(state);
+ this.kafkaConsumerClientResolver =
+ new ClassAliasResolver<>(GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory.class);
+ try {
+ this.kafkaConsumerClient = this.closer.register(
+ this.kafkaConsumerClientResolver.resolveClass(state.getProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS,
+ DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS))
+ .newInstance()
+ .create(ConfigUtils.propertiesToConfig(state.getProperties())));
+ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+
+ this._schemaRegistry = state.contains(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS) ? Optional.of(
+ KafkaSchemaRegistry.<String, S>get(state.getProperties())) : Optional.<KafkaSchemaRegistry<String, S>>absent();
+
+ this.topicPartitions = getTopicPartitionsFromWorkUnit(state);
+ this.kafkaConsumerClient.assignAndSeek(topicPartitions, getTopicPartitionWatermarks(this.topicPartitions));
+ this.messageIterator = this.kafkaConsumerClient.consume();
+
+ this.partitions = KafkaUtils.getPartitions(state);
+ this.partitionIdToIndexMap = Maps.newHashMapWithExpectedSize(this.partitions.size());
+ try {
+ this.latestOffsetMap = this.kafkaConsumerClient.getLatestOffsets(this.partitions);
+ } catch (KafkaOffsetRetrievalFailureException e) {
+ e.printStackTrace();
+ }
+ this.statsTracker = new KafkaExtractorStatsTracker(state, partitions);
+ this.produceRateTracker = new KafkaProduceRateTracker(state, partitions, getWatermarkTracker(), statsTracker);
+ this.extractorStatsReportingTimeIntervalMillis =
+ state.getPropAsLong(KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES_KEY,
+ DEFAULT_KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES) * 60 * 1000;
+ resetExtractorStatsAndWatermarks(true);
+
+ //Schedule a thread for reporting Kafka consumer metrics
+ this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+ Map<String, Metric> codahaleMetricMap = kafkaConsumerClient.getMetrics();
+ for (Map.Entry<String, Metric> metricEntry : codahaleMetricMap.entrySet()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Metric name: {}, Value: {}", metricEntry.getKey(),
+ ((Gauge<Double>) metricEntry.getValue()).getValue());
+ }
+ consumerMetricsGauges.computeIfAbsent(metricEntry.getKey(), k -> {
+ AtomicDouble d = new AtomicDouble();
+ ContextAwareGauge<Double> consumerMetricGauge =
+ getMetricContext().newContextAwareGauge(metricEntry.getKey(), () -> d.get());
+ getMetricContext().register(metricEntry.getKey(), consumerMetricGauge);
+ return d;
+ }).set(((Gauge<Double>) metricEntry.getValue()).getValue());
+ }
+ }, 0, 60, TimeUnit.SECONDS);
+
+ this.recordCreationTimestampFieldName =
+ this.workUnitState.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD, null);
+ this.recordCreationTimestampUnit = TimeUnit.valueOf(
+ this.workUnitState.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, TimeUnit.MILLISECONDS.name()));
+ }
+
+ private Map<KafkaPartition, LongWatermark> getTopicPartitionWatermarks(List<KafkaPartition> topicPartitions) {
+ List<String> topicPartitionStrings =
+ topicPartitions.stream().map(topicPartition -> topicPartition.toString()).collect(Collectors.toList());
+ // read watermarks from storage
+ Map<String, CheckpointableWatermark> kafkaWatermarkMap =
+ super.getCommittedWatermarks(KafkaWatermark.class, topicPartitionStrings);
+
+ Map<KafkaPartition, LongWatermark> longWatermarkMap = new HashMap<>();
+ for (KafkaPartition topicPartition : topicPartitions) {
+ String topicPartitionString = topicPartition.toString();
+ if (kafkaWatermarkMap.containsKey(topicPartitionString)) {
+ LongWatermark longWatermark = ((KafkaWatermark) kafkaWatermarkMap.get(topicPartitionString)).getLwm();
+ longWatermarkMap.put(topicPartition, longWatermark);
+ } else {
+ longWatermarkMap.put(topicPartition, new LongWatermark(0L));
+ }
+ }
+ return longWatermarkMap;
+ }
+
+ private List<KafkaPartition> getTopicPartitionsFromWorkUnit(WorkUnitState state) {
+ // what topic partitions are we responsible for?
+ List<KafkaPartition> topicPartitions = new ArrayList<>();
+
+ WorkUnit workUnit = state.getWorkunit();
+ String topicNameProp = KafkaSource.TOPIC_NAME;
+ int numOfPartitions =
+ workUnit.contains(NUM_PARTITIONS_ASSIGNED) ? Integer.parseInt(workUnit.getProp(NUM_PARTITIONS_ASSIGNED)) : 0;
+
+ for (int i = 0; i < numOfPartitions; ++i) {
+ if (workUnit.getProp(topicNameProp, null) == null) {
+ log.warn("There's no topic.name property being set in workunt which could be an illegal state");
+ break;
+ }
+ String topicName = workUnit.getProp(topicNameProp);
+
+ String partitionIdProp = KafkaSource.PARTITION_ID + "." + i;
+ int partitionId = workUnit.getPropAsInt(partitionIdProp);
+ KafkaPartition topicPartition = new KafkaPartition.Builder().withTopicName(topicName).withId(partitionId).build();
+ topicPartitions.add(topicPartition);
+ }
+ return topicPartitions;
+ }
+
+ /**
+ * Get the schema (metadata) of the extracted data records.
+ *
+ * @return the schema of Kafka topic being extracted
+ */
+ @Override
+ public S getSchema() {
+ try {
+ Schema schema = (Schema) this._schemaRegistry.get().getLatestSchemaByTopic(this.topicPartitions.get(0).getTopicName());
+ return (S) schema;
+ } catch (SchemaRegistryException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ @Override
+ public List<Tag<?>> generateTags(State state) {
+ List<Tag<?>> tags = super.generateTags(state);
+ String clusterIdentifier = ClustersNames.getInstance().getClusterName();
+ tags.add(new Tag<>("clusterIdentifier", clusterIdentifier));
+ return tags;
+ }
+
+ /**
+ * Return the next record. Return null if we're shutdown.
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public RecordEnvelope<DecodeableKafkaRecord> readRecordEnvelopeImpl() throws IOException {
+ if (this.shutdownRequested.get()) {
+ return null;
+ }
+ this.readStartTime = System.nanoTime();
+ long fetchStartTime = System.nanoTime();
+ try {
+ while (this.messageIterator == null || !this.messageIterator.hasNext()) {
+ Long currentTime = System.currentTimeMillis();
+ //it's time to flush, so break the while loop and directly return null
+ if ((currentTime - timeOfLastFlush) > this.flushIntervalMillis) {
+ return new FlushRecordEnvelope();
+ }
+ try {
+ fetchStartTime = System.nanoTime();
+ this.messageIterator = this.kafkaConsumerClient.consume();
+ } catch (Exception e) {
+ log.error("Failed to consume from Kafka", e);
+ }
+ }
+ DecodeableKafkaRecord kafkaConsumerRecord = (DecodeableKafkaRecord) this.messageIterator.next();
+
+ int partitionIndex = this.partitionIdToIndexMap.get(kafkaConsumerRecord.getPartition());
+ this.statsTracker.onFetchNextMessageBuffer(partitionIndex, fetchStartTime);
+
+ // track time for converting KafkaConsumerRecord to a RecordEnvelope
+ long decodeStartTime = System.nanoTime();
+ KafkaPartition topicPartition =
+ new KafkaPartition.Builder().withTopicName(kafkaConsumerRecord.getTopic()).withId(kafkaConsumerRecord.getPartition()).build();
+ RecordEnvelope<DecodeableKafkaRecord> recordEnvelope = new RecordEnvelope(kafkaConsumerRecord,
+ new KafkaWatermark(topicPartition, new LongWatermark(kafkaConsumerRecord.getOffset())));
+ recordEnvelope.setRecordMetadata("topicPartition", topicPartition);
+ recordEnvelope.setRecordMetadata(DATASET_KEY, topicPartition.getTopicName());
+ recordEnvelope.setRecordMetadata(DATASET_PARTITION_KEY, "" + topicPartition.getId());
+ this.statsTracker.onDecodeableRecord(partitionIndex, readStartTime, decodeStartTime,
+ kafkaConsumerRecord.getValueSizeInBytes(),
+ kafkaConsumerRecord.isTimestampLogAppend() ? kafkaConsumerRecord.getTimestamp() : 0L,
+ (this.recordCreationTimestampFieldName != null) ? kafkaConsumerRecord.getRecordCreationTimestamp(
+ this.recordCreationTimestampFieldName, this.recordCreationTimestampUnit) : 0L);
+ this.perPartitionLastSuccessfulRecord.put(partitionIndex, kafkaConsumerRecord);
+ this.nextWatermark.set(partitionIndex, kafkaConsumerRecord.getNextOffset());
+ return recordEnvelope;
+ } catch (Throwable t) {
+ this.statsTracker.onUndecodeableRecord(0);
+ if (shouldLogError()) {
+ log.error("Error when decoding a Kafka consumer record");
+ }
+ throw new IOException("Error in extraction", t);
+ }
+ }
+
+ private boolean shouldLogError() {
+ return this.statsTracker.getUndecodableMessageCount() <= MAX_LOG_DECODING_ERRORS;
+ }
+
+ @Override
+ protected void onFlushAck() throws IOException {
+ try {
+ //Refresh the latest offsets of TopicPartitions processed by the KafkaExtractor.
+ this.latestOffsetMap = this.kafkaConsumerClient.getLatestOffsets(this.partitions);
+ } catch (KafkaOffsetRetrievalFailureException e) {
+ log.error("Unable to retrieve latest offsets due to {}", e);
+ }
+ long currentTime = System.currentTimeMillis();
+ //Update the watermarks to include the current topic partition produce rates
+ this.produceRateTracker.writeProduceRateToKafkaWatermarks(this.latestOffsetMap, getLastCommittedWatermarks(),
+ this.highWatermark, currentTime);
+
+ // Assemble additional tags to be part of GTE, for now only Partition-ProduceRate.
+ Map<KafkaPartition, Map<String, String>> additionalTags = getAdditionalTagsHelper();
+
+ //Commit offsets to the watermark storage.
+ super.onFlushAck();
+
+ //Emit GobblinTrackingEvent with current extractor stats and reset them before the next epoch starts.
+ if (this.isInstrumentationEnabled()) {
+ if (currentTime - this.lastExtractorStatsReportingTime > this.extractorStatsReportingTimeIntervalMillis) {
+ for (int partitionIndex = 0; partitionIndex < this.partitions.size(); partitionIndex++) {
+ this.statsTracker.updateStatisticsForCurrentPartition(partitionIndex, readStartTime,
+ getLastSuccessfulRecordHeaderTimestamp(partitionIndex));
+ }
+ Map<KafkaPartition, Map<String, String>> tagsForPartitions =
+ this.statsTracker.generateTagsForPartitions(lowWatermark, highWatermark, nextWatermark, additionalTags);
+ this.statsTracker.emitTrackingEvents(getMetricContext(), tagsForPartitions);
+ this.resetExtractorStatsAndWatermarks(false);
+ this.lastExtractorStatsReportingTime = currentTime;
+ }
+ }
+ }
+
+ @Override
+ public CommitStep initCommitStep(String commitStepAlias, boolean isPrecommit) throws IOException {
+ try {
+ log.info("Instantiating {}", commitStepAlias);
+ return (CommitStep) GobblinConstructorUtils.invokeLongestConstructor(
+ new ClassAliasResolver(CommitStep.class).resolveClass(commitStepAlias), config, statsTracker);
+ } catch (ReflectiveOperationException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * A helper function to transform a Map<KafkaPartition, Double> to Map<KafkaPartition, Map<String, String>>.
+ * If hard to read: Using Collectors.toMap method to construct inline-initialized Map.
+ */
+ Map<KafkaPartition, Map<String, String>> getAdditionalTagsHelper() {
+ return produceRateTracker.getPartitionsToProdRate()
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ value -> Stream.of(new AbstractMap.SimpleEntry<>(KAFKA_PARTITION_PRODUCE_RATE_KEY, value.toString()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))));
+ }
+
+ @VisibleForTesting
+ public void resetExtractorStatsAndWatermarks(boolean isInit) {
+ if (isInit) {
+ //Initialize nextwatermark, highwatermark and lowwatermarks for Extractor stats reporting.
+ this.perPartitionLastSuccessfulRecord = Maps.newHashMapWithExpectedSize(this.partitions.size());
+ this.lastExtractorStatsReportingTime = System.currentTimeMillis();
+ this.lowWatermark =
+ new MultiLongWatermark(this.partitions.stream().map(partition -> 0L).collect(Collectors.toList()));
+ this.highWatermark =
+ new MultiLongWatermark(this.partitions.stream().map(partition -> 0L).collect(Collectors.toList()));
+ }
+
+ this.workUnitState.removeProp(KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME);
+ this.workUnitState.removeProp(KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME);
+ this.workUnitState.removeProp(KafkaSource.PREVIOUS_LOW_WATERMARK);
+ this.workUnitState.removeProp(KafkaSource.PREVIOUS_HIGH_WATERMARK);
+ this.workUnitState.removeProp(KafkaSource.PREVIOUS_LATEST_OFFSET);
+
+ int partitionIndex = 0;
+ for (KafkaPartition partition : partitions) {
+ if (isInit) {
+ this.partitionIdToIndexMap.put(partition.getId(), partitionIndex);
+ }
+
+ this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_HIGH_WATERMARK, partitionIndex),
+ this.highWatermark.get(partitionIndex));
+ this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_LOW_WATERMARK, partitionIndex),
+ this.lowWatermark.get(partitionIndex));
+ this.workUnitState.setProp(
+ KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME, partitionIndex),
+ this.statsTracker.getStatsMap().get(partitions.get(partitionIndex)).getStartFetchEpochTime());
+ this.workUnitState.setProp(
+ KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME, partitionIndex),
+ this.statsTracker.getStatsMap().get(partitions.get(partitionIndex)).getStopFetchEpochTime());
+ this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_LATEST_OFFSET, partitionIndex),
+ this.highWatermark.get(partitionIndex));
+
+ KafkaWatermark kafkaWatermark = (KafkaWatermark) this.lastCommittedWatermarks.get(partition.toString());
+ long lowWatermarkValue = 0L;
+ if (kafkaWatermark != null) {
+ lowWatermarkValue = kafkaWatermark.getLwm().getValue() + 1;
+ }
+ this.lowWatermark.set(partitionIndex, lowWatermarkValue);
+ if (latestOffsetMap.containsKey(partition)) {
+ this.highWatermark.set(partitionIndex, latestOffsetMap.get(partition));
+ }
+ partitionIndex++;
+ }
+ this.nextWatermark = new MultiLongWatermark(this.lowWatermark);
+
+ // Add error partition count and error message count to workUnitState
+ this.workUnitState.setProp(ConfigurationKeys.ERROR_PARTITION_COUNT, this.statsTracker.getErrorPartitionCount());
+ this.workUnitState.setProp(ConfigurationKeys.ERROR_MESSAGE_UNDECODABLE_COUNT,
+ this.statsTracker.getUndecodableMessageCount());
+ this.workUnitState.setActualHighWatermark(this.nextWatermark);
+
+ //Reset stats tracker
+ this.statsTracker.reset();
+ }
+
+ protected long getLastSuccessfulRecordHeaderTimestamp(int partitionId) {
+ return 0;
+ }
+
+ /**
+ * Call back that asks the extractor to remove work from its plate
+ * @param workUnitState
+ */
+ public boolean onWorkUnitRemove(WorkUnitState workUnitState) {
+ // TODO: check if these topic partitions actually were part of the assignment
+ // add to queue of flush control messages
+ // set up ack on them
+ return false;
+ }
+
+ public boolean onWorkUnitAdd(WorkUnitState workUnitState) {
+ List<KafkaPartition> newTopicPartitions = getTopicPartitionsFromWorkUnit(workUnitState);
+ // get watermarks for these topic partitions
+ Map<KafkaPartition, LongWatermark> topicWatermarksMap = getTopicPartitionWatermarks(newTopicPartitions);
+ this.topicPartitions.addAll(newTopicPartitions);
+ this.kafkaConsumerClient.assignAndSeek(topicPartitions, topicWatermarksMap);
+ return true;
+ }
+
+ @Override
+ public long getExpectedRecordCount() {
+ return _rowCount.get();
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.closer.close();
+ }
+
+ @Deprecated
+ @Override
+ public long getHighWatermark() {
+ return 0;
+ }
+
+ @Override
+ public String toString() {
+ return topicPartitions.toString();
+ }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
new file mode 100644
index 0000000..aeab8e0
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.CountEventBuilder;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.CheckpointableWatermarkState;
+import org.apache.gobblin.runtime.StateStoreBasedWatermarkStorage;
+import org.apache.gobblin.source.extractor.extract.AbstractSource;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.io.GsonInterfaceAdapter;
+
+import static org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaBiLevelWorkUnitPacker.bestFitDecreasingBinPacking;
+
+/**
+ *
+ * An implementation of {@link KafkaWorkUnitPacker} that used for streamlined Kafka ingestion, which:
+ *
+ * 1) Groups partitions of the same topic together. Multiple topics are never mixed in a base {@link WorkUnit},
+ * but may be mixed in a {@link MultiWorkUnit}
+ * 2) Don't assign offset range within WorkUnit but provides a list of partitions (or topics) to inform streaming
+ * {@link org.apache.gobblin.source.extractor.Extractor} of where to pull events from, behaves as an "index' for
+ * {@link org.apache.gobblin.source.extractor.Extractor}.
+ *
+ * It is then {@link org.apache.gobblin.source.extractor.Extractor}'s responsibility to interact with
+ * {@link org.apache.gobblin.writer.WatermarkStorage} on determining offset of each
+ * {@link org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition} that it was assigned.
+ */
+@Slf4j
+public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
+ private static final int DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER = 10;
+ private static final String TOPIC_PARTITION_DELIMITER = "-";
+
+ //A global configuration for container capacity. The container capacity refers to the peak rate (in MB/s) that a
+ //single JVM can consume from Kafka for a single topic and controls the number of partitions of a topic that will be
+ // packed into a single workunit. For example, if the container capacity is set to 10, and each topic partition has a
+ // weight of 1, then 10 partitions of the topic will be packed into a single workunit. This configuration is topic-independent
+ // i.e. all topics will be assumed to have the same peak consumption rate when set.
+ public static final String CONTAINER_CAPACITY_KEY = "gobblin.kafka.streaming.containerCapacity";
+ public static final double DEFAULT_CONTAINER_CAPACITY = 10;
+
+ //A boolean flag to enable per-topic container capacity, where "container capacity" is as defined earlier. This
+ // configuration is useful in scenarios where the write performance can vary significantly across topics due to differences
+ // in schema, as in the case of columnar formats such as ORC and Parquet. When enabled, the bin packing algorithm uses
+ // historic consumption rates for a given topic as tracked by the ingestion pipeline.
+
+ public static final String IS_PER_TOPIC_CONTAINER_CAPACITY_ENABLED_KEY = "gobblin.kafka.streaming.isPerTopicBinCapacityEnabled";
+ public static final Boolean DEFAULT_IS_PER_TOPIC_CONTAINER_CAPACITY_ENABLED = false;
+
+ //A topic-specific config that controls the minimum number of containers for that topic.
+ public static final String MIN_CONTAINERS_FOR_TOPIC = "gobblin.kafka.minContainersForTopic";
+ public static final String PARTITION_WATERMARK = "gobblin.kafka.partition.watermark";
+ public static final String PACKING_START_TIME_MILLIS = "gobblin.kafka.packer.packingStartTimeMillis";
+ public static final String IS_STATS_BASED_PACKING_ENABLED_KEY = "gobblin.kafka.streaming.isStatsBasedPackingEnabled";
+ public static final boolean DEFAULT_IS_STATS_BASED_PACKING_ENABLED = false;
+ public static final String CONTAINER_CAPACITY_COMPUTATION_STRATEGY_KEY = "gobblin.kafka.streaming.containerCapacityComputationStrategy";
+ public static final String DEFAULT_CONTAINER_CAPACITY_COMPUTATION_STRATEGY = ContainerCapacityComputationStrategy.MEDIAN.name();
+
+ public enum ContainerCapacityComputationStrategy {
+ MIN, MAX, MEAN, MEDIAN
+ }
+
+ private static final Gson GSON = GsonInterfaceAdapter.getGson(Object.class);
+
+ /**
+ * Configuration to enable indexing on packing.
+ */
+ private static final String INDEXING_ENABLED = "gobblin.kafka.streaming.enableIndexing";
+ private static final boolean DEFAULT_INDEXING_ENABLED = true;
+
+ private static final String METRICS_PREFIX = "binpacker.metrics.";
+
+ /**
+ * When indexing-packing is enabled, the number of partitions is important for extractor to know
+ * how many kafka partitions need to be pulled from.
+ *
+ * Set to public-static to share with Extractor.
+ * TODO: Shall be changed to package-private
+ */
+ public static final String NUM_PARTITIONS_ASSIGNED = "gobblin.kafka.streaming.numPartitions";
+ //A derived metric that defines the default workunit size, in case of workunit size cannot be estimated.
+ public static final String DEFAULT_WORKUNIT_SIZE_KEY = "gobblin.kafka.defaultWorkUnitSize";
+ //A lower bound for the workunit size.
+ public static final String MIN_WORKUNIT_SIZE_KEY = "gobblin.kafka.minWorkUnitSize";
+
+ private static final String NUM_CONTAINERS_EVENT_NAME = "NumContainers";
+
+ private final long packingStartTimeMillis;
+ private final Optional<StateStoreBasedWatermarkStorage> watermarkStorage;
+ private final Optional<MetricContext> metricContext;
+ private final boolean isStatsBasedPackingEnabled;
+ private final Boolean isPerTopicContainerCapacityEnabled;
+ private final ContainerCapacityComputationStrategy containerCapacityComputationStrategy;
+ private final Map<String, KafkaStreamingExtractor.KafkaWatermark> lastCommittedWatermarks = Maps.newHashMap();
+ private final Map<String, List<Double>> capacitiesByTopic = Maps.newHashMap();
+ private final EventSubmitter eventSubmitter;
+ private SourceState state;
+
+ public KafkaTopicGroupingWorkUnitPacker(AbstractSource<?, ?> source, SourceState state,
+ Optional<MetricContext> metricContext) {
+ super(source, state);
+ this.state = state;
+ this.isStatsBasedPackingEnabled =
+ state.getPropAsBoolean(IS_STATS_BASED_PACKING_ENABLED_KEY, DEFAULT_IS_STATS_BASED_PACKING_ENABLED);
+ this.isPerTopicContainerCapacityEnabled = state
+ .getPropAsBoolean(IS_PER_TOPIC_CONTAINER_CAPACITY_ENABLED_KEY, DEFAULT_IS_PER_TOPIC_CONTAINER_CAPACITY_ENABLED);
+ this.containerCapacityComputationStrategy = ContainerCapacityComputationStrategy.valueOf(
+ state.getProp(CONTAINER_CAPACITY_COMPUTATION_STRATEGY_KEY, DEFAULT_CONTAINER_CAPACITY_COMPUTATION_STRATEGY)
+ .toUpperCase());
+ this.watermarkStorage =
+ Optional.fromNullable(this.isStatsBasedPackingEnabled ? new StateStoreBasedWatermarkStorage(state) : null);
+ this.packingStartTimeMillis = System.currentTimeMillis();
+ this.metricContext = metricContext;
+ this.eventSubmitter =
+ new EventSubmitter.Builder(this.metricContext, KafkaTopicGroupingWorkUnitPacker.class.getName()).build();
+ }
+
+ /**
+ * Pack using the following strategy.
+ * - Each container has a configured capacity in terms of the cost metric.
+ * This is configured by {@value CONTAINER_CAPACITY_KEY}.
+ * - For each topic pack the workunits into a set of topic specific buckets by filling the fullest bucket that can hold
+ * the workunit without exceeding the container capacity.
+ * - The topic specific multi-workunits are squeezed and returned as a workunit.
+ */
+ @Override
+ public List<WorkUnit> pack(Map<String, List<WorkUnit>> workUnitsByTopic, int numContainers) {
+ double containerCapacity = this.state.getPropAsDouble(CONTAINER_CAPACITY_KEY, DEFAULT_CONTAINER_CAPACITY);
+
+ if (this.watermarkStorage.isPresent()) {
+ try {
+ addStatsToWorkUnits(workUnitsByTopic);
+ } catch (IOException e) {
+ log.error("Unable to get stats from watermark storage.");
+ throw new RuntimeException(e);
+ }
+ }
+
+ setWorkUnitEstSizes(workUnitsByTopic);
+
+ List<MultiWorkUnit> mwuGroups = Lists.newArrayList();
+
+ for (Map.Entry<String, List<WorkUnit>> entry : workUnitsByTopic.entrySet()) {
+ String topic = entry.getKey();
+ List<WorkUnit> workUnitsForTopic = entry.getValue();
+ if (this.isStatsBasedPackingEnabled && this.isPerTopicContainerCapacityEnabled) {
+ containerCapacity = getContainerCapacityForTopic(capacitiesByTopic.get(topic), this.containerCapacityComputationStrategy);
+ log.info("Container capacity for topic {}: {}", topic, containerCapacity);
+ }
+ double estimatedDataSizeForTopic = calcTotalEstSizeForTopic(workUnitsForTopic);
+ int previousSize = mwuGroups.size();
+ if (estimatedDataSizeForTopic < containerCapacity) {
+ // If the total estimated size of a topic is then the container capacity then put all partitions of this
+ // topic in a single group.
+ MultiWorkUnit mwuGroup = MultiWorkUnit.createEmpty();
+ addWorkUnitsToMultiWorkUnit(workUnitsForTopic, mwuGroup);
+ mwuGroups.add(mwuGroup);
+ } else {
+ // Use best-fit-decreasing to group workunits for a topic into multiple groups.
+ mwuGroups.addAll(bestFitDecreasingBinPacking(workUnitsForTopic, containerCapacity));
+ }
+ int numContainersForTopic = mwuGroups.size() - previousSize;
+ log.info("Packed partitions for topic {} into {} containers", topic, Integer.toString(numContainersForTopic));
+ if (this.metricContext.isPresent()) {
+ //Report the number of containers used for each topic.
+ String metricName = METRICS_PREFIX + topic + ".numContainers";
+ ContextAwareGauge<Integer> gauge =
+ this.metricContext.get().newContextAwareGauge(metricName, () -> numContainersForTopic);
+ this.metricContext.get().register(metricName, gauge);
+
+ //Submit a container count event for the given topic
+ CountEventBuilder countEventBuilder = new CountEventBuilder(NUM_CONTAINERS_EVENT_NAME, numContainersForTopic);
+ countEventBuilder.addMetadata("topic", topic);
+ this.eventSubmitter.submit(countEventBuilder);
+ }
+ }
+
+ List<WorkUnit> squeezedGroups = squeezeMultiWorkUnits(mwuGroups);
+ log.debug("Squeezed work unit groups: " + squeezedGroups);
+ return squeezedGroups;
+ }
+
+ /**
+ * TODO: This method should be moved into {@link KafkaSource}, which requires moving classes such
+ * as {@link KafkaStreamingExtractor.KafkaWatermark} to the open source. A side-effect of this method is to
+ * populate a map (called "capacitiesByTopic") of topicName to the peak consumption rate observed
+ * by a JVM for a given topic. This capacity limits the number of partitions of a topic grouped into a single workunit.
+ * The capacity is computed from the historic peak consumption rates observed by different containers processing
+ * a given topic, using the configured {@link ContainerCapacityComputationStrategy}.
+ *
+ * Read the average produce rates for each topic partition from Watermark storage and add them to the workunit.
+ * @param workUnitsByTopic
+ * @throws IOException
+ */
+ private void addStatsToWorkUnits(Map<String, List<WorkUnit>> workUnitsByTopic) throws IOException {
+ for (CheckpointableWatermarkState state : this.watermarkStorage.get().getAllCommittedWatermarks()) {
+ String topicPartition = state.getSource();
+ KafkaStreamingExtractor.KafkaWatermark watermark =
+ GSON.fromJson(state.getProp(topicPartition), KafkaStreamingExtractor.KafkaWatermark.class);
+ lastCommittedWatermarks.put(topicPartition, watermark);
+ if (this.isPerTopicContainerCapacityEnabled) {
+ String topicName = topicPartition.split(TOPIC_PARTITION_DELIMITER)[0];
+ List<Double> capacities = capacitiesByTopic.getOrDefault(topicName, Lists.newArrayList());
+ capacities.add(watermark.getAvgConsumeRate() > 0 ? watermark.getAvgConsumeRate() : DEFAULT_CONTAINER_CAPACITY);
+ capacitiesByTopic.put(topicName, capacities);
+ }
+ }
+
+ for (Map.Entry<String, List<WorkUnit>> entry : workUnitsByTopic.entrySet()) {
+ String topic = entry.getKey();
+ List<WorkUnit> workUnits = entry.getValue();
+ for (WorkUnit workUnit : workUnits) {
+ int partitionId = Integer.parseInt(workUnit.getProp(KafkaSource.PARTITION_ID));
+ String topicPartition = new KafkaPartition.Builder().withTopicName(topic).withId(partitionId).toString();
+ KafkaStreamingExtractor.KafkaWatermark watermark = lastCommittedWatermarks.get(topicPartition);
+ workUnit.setProp(PARTITION_WATERMARK, GSON.toJson(watermark));
+ workUnit.setProp(PACKING_START_TIME_MILLIS, this.packingStartTimeMillis);
+ workUnit.setProp(DEFAULT_WORKUNIT_SIZE_KEY, getDefaultWorkUnitSize());
+ workUnit.setProp(MIN_WORKUNIT_SIZE_KEY, getMinWorkUnitSize(workUnit));
+ }
+ }
+ }
+
+ private Double getDefaultWorkUnitSize() {
+ return state.getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY,
+ KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) / DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER;
+ }
+
+ /**
+ * A helper method that configures the minimum workunit size for each topic partition based on the lower bound of
+ * the number of containers to be used for the topic.
+ * @param workUnit
+ * @return the minimum workunit size.
+ */
+ private Double getMinWorkUnitSize(WorkUnit workUnit) {
+ int minContainersForTopic = workUnit.getPropAsInt(MIN_CONTAINERS_FOR_TOPIC, -1);
+ if (minContainersForTopic == -1) {
+ //No minimum configured? Return lower bound for workunit size to be 0.
+ return 0.0;
+ }
+
+ //Compute the maximum number of partitions to be packed into each container.
+ int maxNumPartitionsPerContainer = workUnit.getPropAsInt(KafkaSource.NUM_TOPIC_PARTITIONS) / minContainersForTopic;
+ return state.getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY,
+ KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) / maxNumPartitionsPerContainer;
+ }
+
+ /**
+ * Indexing all partitions that will be handled in this topic-specific MWU into a single WU by only tracking their
+ * topic/partition id.
+ * Indexed WU will not have offset assigned to pull for each partitions
+ */
+ @Override
+ protected List<WorkUnit> squeezeMultiWorkUnits(List<MultiWorkUnit> multiWorkUnits) {
+
+ if (state.getPropAsBoolean(INDEXING_ENABLED, DEFAULT_INDEXING_ENABLED)) {
+ List<WorkUnit> indexedWorkUnitList = new ArrayList<>();
+
+ // id to append to the task output directory to make it unique to avoid multiple flush publishers
+ // attempting to move the same file.
+ int uniqueId = 0;
+ for (MultiWorkUnit mwu : multiWorkUnits) {
+ // Select a sample WU.
+ WorkUnit indexedWorkUnit = mwu.getWorkUnits().get(0);
+ List<KafkaPartition> topicPartitions = getPartitionsFromMultiWorkUnit(mwu);
+
+ // Indexing all topics/partitions into this WU.
+ populateMultiPartitionWorkUnit(topicPartitions, indexedWorkUnit);
+
+ // Adding Number of Partitions as part of WorkUnit so that Extractor has clue on how many iterations to run.
+ indexedWorkUnit.setProp(NUM_PARTITIONS_ASSIGNED, topicPartitions.size());
+
+ // Need to make the task output directory unique to file move conflicts in the flush publisher.
+ String outputDir = state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR);
+ indexedWorkUnit.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(outputDir, Integer.toString(uniqueId++)));
+ indexedWorkUnitList.add(indexedWorkUnit);
+ }
+ return indexedWorkUnitList;
+ } else {
+ return super.squeezeMultiWorkUnits(multiWorkUnits);
+ }
+ }
+
+ /**
+ * A method that returns the container capacity for a given topic given the
+ * @param capacities measured container capacities derived from watermarks
+ * @param strategy the algorithm to derive the container capacity from prior measurements
+ * @return the container capacity obtained using the given {@link ContainerCapacityComputationStrategy}.
+ */
+ @VisibleForTesting
+ static double getContainerCapacityForTopic(List<Double> capacities, ContainerCapacityComputationStrategy strategy) {
+ Preconditions.checkArgument(capacities.size() > 0, "Capacities size must be > 0");
+ Collections.sort(capacities);
+ log.info("Capacity computation strategy: {}, capacities: {}", strategy.name(), capacities);
+ switch (strategy) {
+ case MIN:
+ return capacities.get(0);
+ case MAX:
+ return capacities.get(capacities.size() - 1);
+ case MEAN:
+ return (capacities.stream().mapToDouble(capacity -> capacity).sum()) / capacities.size();
+ case MEDIAN:
+ return ((capacities.size() % 2) == 0) ? (
+ (capacities.get(capacities.size() / 2) + capacities.get(capacities.size() / 2 - 1)) / 2)
+ : capacities.get(capacities.size() / 2);
+ default:
+ throw new RuntimeException("Unsupported computation strategy: " + strategy.name());
+ }
+ }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java
new file mode 100644
index 0000000..6113a79
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;
+
+
+import java.util.Date;
+
+import com.google.gson.Gson;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaProduceRateTracker;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.io.GsonInterfaceAdapter;
+
+
+/**
+ * A {@link KafkaWorkUnitSizeEstimator} that uses historic produce rates of Kafka TopicPartitions and the current lag
+ * to determine the Workunit size. The inputs to the WorkUnitSizeEstimator are the following:
+ * <ul>
+ * <li> Current Lag in number of records (L) </li>
+ * <li> Average record size (R) </li>
+ * <li> Historic produce rates by hour-of-day and day-of-week (P) </li>
+ * <li> Target SLA to achieve zero lag (SLA) </li>
+ * </ul>
+ * Based on the current lag, historic produce rate for the Kafka TopicPartition, and
+ * a target SLA, we estimate the minimum consume rate (C) required to meet the target SLA using the following formula:
+ * C = (L * R)/SLA + P.
+ * To allow headroom for week-over-week and intra-hour variances, we scale the historic produce rates by
+ * an over-provisioning factor O. The formula is then modified to:
+ * C = (L * R)/SLA + (P * O).
+ * The calculated consumption rate C is returned as the estimated workunit size. Note that the estimated workunit size may exceed the
+ * container capacity. The bin packer is assumed to create a new bin containing only this workunit.
+ *
+ * Assumptions:
+ * <ul>
+ * <li>The container capacity is assumed to be defined in MB/s</li>
+ * <li>The topic partition produce rates are assumed to be tracked in bytes/s</li>
+ * </ul>
+ */
+@Slf4j
+public class ProduceRateAndLagBasedWorkUnitSizeEstimator implements KafkaWorkUnitSizeEstimator {
+ public static final String CATCHUP_SLA_IN_HOURS_KEY = "gobblin.kafka.catchUpSlaInHours";
+ private static final int DEFAULT_CATCHUP_SLA_IN_HOURS = 1;
+ //The interval at which workunits are re-calculated.
+ public static final String REPLANNING_INTERVAL_IN_HOURS_KEY = "gobblin.kafka.replanningIntervalInHours";
+ //Set default mode to disable time-based replanning i.e. set replanningIntervalInHours to its maximum value.
+ // In this case, the work unit weight estimation will select the maximum produce rate for the topic partition across
+ // all hours and days of week.
+ private static final int DEFAULT_KAFKA_REPLANNING_INTERVAL = 168; //24 * 7
+ //An over-provisioning factor to provide head room to allow variances in traffic e.g. sub-hour rate variances, week-over-week
+ // traffic variances etc. This config has the effect of multiplying the historic rate by this factor.
+ public static final String PRODUCE_RATE_SCALING_FACTOR_KEY = "gobblin.kafka.produceRateScalingFactor";
+ private static final Double DEFAULT_PRODUCE_RATE_SCALING_FACTOR = 1.3;
+
+ public static final int ONE_MEGA_BYTE = 1048576;
+ private static final Gson GSON = GsonInterfaceAdapter.getGson(Object.class);
+
+ private final long catchUpSlaInHours;
+ private final long replanIntervalInHours;
+ private final double produceRateScalingFactor;
+
+ public ProduceRateAndLagBasedWorkUnitSizeEstimator(SourceState state) {
+ this.catchUpSlaInHours = state.getPropAsLong(CATCHUP_SLA_IN_HOURS_KEY, DEFAULT_CATCHUP_SLA_IN_HOURS);
+ this.replanIntervalInHours =
+ state.getPropAsLong(REPLANNING_INTERVAL_IN_HOURS_KEY, DEFAULT_KAFKA_REPLANNING_INTERVAL);
+ this.produceRateScalingFactor =
+ state.getPropAsDouble(PRODUCE_RATE_SCALING_FACTOR_KEY, DEFAULT_PRODUCE_RATE_SCALING_FACTOR);
+ }
+
+ @Override
+ public double calcEstimatedSize(WorkUnit workUnit) {
+ KafkaStreamingExtractor.KafkaWatermark watermark =
+ GSON.fromJson(workUnit.getProp(KafkaTopicGroupingWorkUnitPacker.PARTITION_WATERMARK),
+ KafkaStreamingExtractor.KafkaWatermark.class);
+ String topic = workUnit.getProp(KafkaSource.TOPIC_NAME);
+ String partition = workUnit.getProp(KafkaSource.PARTITION_ID);
+
+ double[][] avgProduceRates = null;
+ long avgRecordSize = 0L;
+ long offsetLag = Long.parseLong(workUnit.getProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY));
+
+ if (watermark != null) {
+ avgProduceRates = watermark.getAvgProduceRates();
+ avgRecordSize = watermark.getAvgRecordSize();
+ offsetLag = offsetLag - watermark.getLwm().getValue();
+ }
+
+ double maxProduceRate = getMaxProduceRateUntilNextReplan(avgProduceRates,
+ workUnit.getPropAsLong(KafkaTopicGroupingWorkUnitPacker.PACKING_START_TIME_MILLIS));
+
+ if (maxProduceRate < 0) {
+ //No previous estimates found.
+ log.debug("No previous produce rate estimate found for {}", topic + "-" + partition);
+ maxProduceRate = workUnit.getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.DEFAULT_WORKUNIT_SIZE_KEY);
+ }
+
+ //Compute the target consume rate in MB/s.
+ double targetConsumeRate =
+ ((double) (offsetLag * avgRecordSize) / (catchUpSlaInHours * 3600 * ONE_MEGA_BYTE)) + (maxProduceRate
+ * produceRateScalingFactor);
+ log.debug("TopicPartiton: {}, Max produce rate: {}, Offset lag: {}, Avg Record size: {}, Target Consume Rate: {}",
+ topic + ":" + partition, maxProduceRate, offsetLag, avgRecordSize, targetConsumeRate);
+ return targetConsumeRate;
+ }
+
+ /**
+ * @param avgProduceRates
+ * @param packingTimeMillis
+ * @return the maximum produce rate in MB/s observed within the time window of [packingTimeMillis, packingTimeMillis+catchUpSla].
+ */
+ private double getMaxProduceRateUntilNextReplan(double[][] avgProduceRates, long packingTimeMillis) {
+ int dayOfWeek = KafkaProduceRateTracker.getDayOfWeek(new Date(packingTimeMillis));
+ int hourOfDay = KafkaProduceRateTracker.getHourOfDay(new Date(packingTimeMillis));
+
+ if (avgProduceRates == null) {
+ return -1.0;
+ }
+
+ double max = avgProduceRates[dayOfWeek][hourOfDay];
+ for (int i = 0; i < replanIntervalInHours; i++) {
+ if ((hourOfDay + 1) >= 24) {
+ dayOfWeek = (dayOfWeek + 1) % 7;
+ }
+ hourOfDay = (hourOfDay + 1) % 24;
+ if (max < avgProduceRates[dayOfWeek][hourOfDay]) {
+ max = avgProduceRates[dayOfWeek][hourOfDay];
+ }
+ }
+ return max / ONE_MEGA_BYTE;
+ }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/UnitKafkaWorkUnitSizeEstimator.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/UnitKafkaWorkUnitSizeEstimator.java
new file mode 100644
index 0000000..efcd8cf
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/UnitKafkaWorkUnitSizeEstimator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+/**
+ * A dummy implementation of {@link KafkaWorkUnitSizeEstimator} that directly returns unit("1") for
+ * {@link #calcEstimatedSize(WorkUnit)} methods as the estimated size for each {@link WorkUnit}
+ * could be useless in certain circumstances.
+ */
+public class UnitKafkaWorkUnitSizeEstimator implements KafkaWorkUnitSizeEstimator {
+ public UnitKafkaWorkUnitSizeEstimator(SourceState state) {
+ // do nothing
+ }
+
+ @Override
+ public double calcEstimatedSize(WorkUnit workUnit) {
+ return 1;
+ }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
index e1635ea..03fd945 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
@@ -40,6 +41,7 @@ public class LoggingPusherTest {
TestAppender testAppender = new TestAppender();
Logger logger = LogManager.getLogger(LoggingPusher.class.getName());
+ logger.setLevel(Level.INFO);
logger.addAppender(testAppender);
KeyValuePusher<String, String> loggingPusher =
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorUtils.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorUtils.java
new file mode 100644
index 0000000..649cb0d
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorUtils.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.io.File;
+import java.util.List;
+import java.util.Random;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
+import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
+import org.apache.gobblin.runtime.StateStoreBasedWatermarkStorage;
+import org.apache.gobblin.source.extractor.WatermarkInterval;
+import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaTopicGroupingWorkUnitPacker;
+
+import static org.apache.gobblin.configuration.ConfigurationKeys.JOB_NAME_KEY;
+import static org.apache.gobblin.configuration.ConfigurationKeys.WATERMARK_INTERVAL_VALUE_KEY;
+
+@Slf4j
+public class KafkaExtractorUtils {
+ private static final Integer MAX_NUM_BROKERS = 100;
+ private static final Integer MAX_NUM_TOPIC_PARTITIONS = 1024;
+
+ /**
+ * A utility method that returns a {@link WorkUnitState} which can be used to instantiate both a batch and a
+ * streaming Kafka extractor.
+ * @param topicName
+ * @param numPartitions
+ * @return
+ */
+ public static WorkUnitState getWorkUnitState(String topicName, int numPartitions) {
+ Preconditions.checkArgument(numPartitions <= MAX_NUM_TOPIC_PARTITIONS, "Num partitions assigned"
+ + "must be smaller than the maximum number of partitions of the topic");
+ WorkUnitState state = new WorkUnitState();
+ state.setProp(JOB_NAME_KEY, "testJob");
+ state.setProp(KafkaSource.TOPIC_NAME, topicName);
+ state.setProp(KafkaSource.GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS, KafkaStreamTestUtils.MockKafka10ConsumerClientFactory.class.getName());
+ state.setProp(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_CLASS,
+ KafkaStreamTestUtils.MockSchemaRegistry.class.getName());
+ state.setProp(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS, KafkaStreamTestUtils.MockSchemaRegistry.class.getName());
+ //Need to set this property for LiKafka10ConsumerClient instantiation
+ state.setProp(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_URL, "http://dummySchemaRegistry:1000");
+ Random random = new Random();
+ for (int i=0; i<numPartitions; i++) {
+ //Assign a random partition id.
+ state.setProp(KafkaSource.PARTITION_ID + "." + i, random.nextInt(MAX_NUM_TOPIC_PARTITIONS));
+ state.setProp(KafkaSource.LEADER_ID + "." + i, random.nextInt(MAX_NUM_BROKERS));
+ state.setProp(KafkaSource.LEADER_HOSTANDPORT + "." + i, "leader-" + i + ":9091");
+ }
+ state.setProp(KafkaTopicGroupingWorkUnitPacker.NUM_PARTITIONS_ASSIGNED, numPartitions);
+ //Configure the watermark storage. We use FileContextBasedFsStateStoreFactory, since it allows for overwriting an existing
+ // state store.
+ state.setProp(StateStoreBasedWatermarkStorage.WATERMARK_STORAGE_TYPE_KEY, FileContextBasedFsStateStoreFactory.class.getName());
+ File stateStoreDir = Files.createTempDir();
+ stateStoreDir.deleteOnExit();
+ state.setProp(StateStoreBasedWatermarkStorage.WATERMARK_STORAGE_CONFIG_PREFIX + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, stateStoreDir.getAbsolutePath());
+
+ //Kafka configurations
+ state.setProp(ConfigurationKeys.KAFKA_BROKERS, "localhost:9091");
+
+ // Constructing a dummy watermark and mock client-factory-class and registry-class
+ List<Long> dummyWatermark = ImmutableList.of(new Long(1));
+ WatermarkInterval watermarkInterval =
+ new WatermarkInterval(new MultiLongWatermark(dummyWatermark), new MultiLongWatermark(dummyWatermark));
+ state.setProp(WATERMARK_INTERVAL_VALUE_KEY, watermarkInterval.toJson());
+
+ state.setWuProperties(state.getProperties(), state.getProperties());
+ return state;
+ }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTrackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTrackerTest.java
new file mode 100644
index 0000000..2dbc351
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTrackerTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.joda.time.LocalDate;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.EvictingQueue;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.extractor.CheckpointableWatermark;
+import org.apache.gobblin.source.extractor.extract.FlushingExtractor;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
+import org.apache.gobblin.writer.LastWatermarkTracker;
+import org.apache.gobblin.writer.WatermarkTracker;
+
+
+public class KafkaProduceRateTrackerTest {
+ private static final LocalDate HOLIDAY_DATE = new LocalDate(2019, 12, 25);
+ private static final LocalDate NON_HOLIDAY_DATE = new LocalDate(2020, 01, 05);
+ private static final Long HOLIDAY_TIME = HOLIDAY_DATE.toDateTimeAtStartOfDay().toInstant().getMillis();
+ private static final Long NON_HOLIDAY_TIME = NON_HOLIDAY_DATE.toDateTimeAtStartOfDay().toInstant().getMillis();
+
+ private KafkaProduceRateTracker tracker;
+ private List<KafkaPartition> kafkaPartitions = new ArrayList<>();
+ private WatermarkTracker watermarkTracker;
+ private WorkUnitState workUnitState;
+ private KafkaExtractorStatsTracker extractorStatsTracker;
+
+ @BeforeClass
+ public void setUp() {
+ kafkaPartitions.add(new KafkaPartition.Builder().withTopicName("test-topic").withId(0).build());
+ kafkaPartitions.add(new KafkaPartition.Builder().withTopicName("test-topic").withId(1).build());
+ this.workUnitState = new WorkUnitState();
+ this.workUnitState.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, 5L);
+ this.watermarkTracker = new LastWatermarkTracker(false);
+ this.extractorStatsTracker = new KafkaExtractorStatsTracker(this.workUnitState, kafkaPartitions);
+ }
+
+ private void assertTopicPartitionOrder(KafkaProduceRateTracker tracker, List<KafkaPartition> partitions) {
+ Iterator<KafkaPartition> keyIterator = tracker.getPartitionsToProdRate().keySet().iterator();
+ for (KafkaPartition partition : partitions) {
+ Assert.assertEquals(partition, keyIterator.next());
+ }
+ }
+
+ private void writeProduceRateToKafkaWatermarksHelper(long readStartTime, long decodeStartTime, long currentTime) {
+ this.extractorStatsTracker.reset();
+ assertTopicPartitionOrder(tracker, kafkaPartitions);
+ extractorStatsTracker.onDecodeableRecord(0, readStartTime, decodeStartTime, 100, currentTime-8000, currentTime-10000);
+ readStartTime++;
+ decodeStartTime++;
+ extractorStatsTracker.onDecodeableRecord(1, readStartTime, decodeStartTime, 200, currentTime-7000, currentTime-9000);
+ extractorStatsTracker.updateStatisticsForCurrentPartition(0, readStartTime, currentTime - 8000);
+ extractorStatsTracker.updateStatisticsForCurrentPartition(1, readStartTime, currentTime - 7000);
+
+ MultiLongWatermark highWatermark = new MultiLongWatermark(Lists.newArrayList(20L, 30L));
+ Map<KafkaPartition, Long> latestOffsetMap = Maps.newHashMap();
+ latestOffsetMap.put(kafkaPartitions.get(0), 35L);
+ latestOffsetMap.put(kafkaPartitions.get(1), 47L);
+ Map<String, CheckpointableWatermark> lastCommittedWatermarks = Maps.newHashMap();
+
+ KafkaPartition topicPartition1 = kafkaPartitions.get(0);
+ KafkaPartition topicPartition2 = kafkaPartitions.get(1);
+
+ lastCommittedWatermarks.put(topicPartition1.toString(),
+ new KafkaStreamingExtractor.KafkaWatermark(topicPartition1, new LongWatermark(5L)));
+ lastCommittedWatermarks.put(topicPartition2.toString(),
+ new KafkaStreamingExtractor.KafkaWatermark(topicPartition2, new LongWatermark(7L)));
+ this.tracker.writeProduceRateToKafkaWatermarks(latestOffsetMap, lastCommittedWatermarks, highWatermark, currentTime);
+ }
+
+ private void assertKafkaWatermarks(long currentTime) {
+ Map<String, CheckpointableWatermark> unacknowledgedWatermarks = watermarkTracker.getAllUnacknowledgedWatermarks();
+ Assert.assertEquals(unacknowledgedWatermarks.size(), 2);
+
+ KafkaPartition topicPartition1 = kafkaPartitions.get(0);
+ KafkaPartition topicPartition2 = kafkaPartitions.get(1);
+
+ for (KafkaPartition topicPartition : Lists.newArrayList(topicPartition1, topicPartition2)) {
+ KafkaStreamingExtractor.KafkaWatermark kafkaWatermark = (KafkaStreamingExtractor.KafkaWatermark) unacknowledgedWatermarks.get(topicPartition.toString());
+ if (currentTime == HOLIDAY_TIME + 10) {
+ Assert.assertTrue(kafkaWatermark.avgProduceRates == null);
+ continue;
+ }
+
+ Assert.assertTrue(kafkaWatermark.avgProduceRates != null);
+ Date date = new Date(currentTime);
+ int hourOfDay = KafkaProduceRateTracker.getHourOfDay(date);
+ int dayOfWeek = KafkaProduceRateTracker.getDayOfWeek(date);
+
+ Assert.assertTrue(kafkaWatermark.avgProduceRates[dayOfWeek][hourOfDay] > 0);
+ for (int i = 0; i < 7; i++) {
+ for (int j = 0; j < 24; j++) {
+ if (i != dayOfWeek || j != hourOfDay) {
+ Assert.assertTrue(kafkaWatermark.avgProduceRates[i][j] < 0);
+ }
+ }
+ }
+ Assert.assertTrue(kafkaWatermark.getAvgConsumeRate() > 0);
+ }
+ }
+
+ @Test
+ public void testWriteProduceRateToKafkaWatermarks() {
+ long readStartTime = System.nanoTime();
+ long decodeStartTime = readStartTime + 1;
+ long currentTime = System.currentTimeMillis();
+
+ WorkUnitState workUnitState = new WorkUnitState();
+ workUnitState.setProp(KafkaProduceRateTracker.KAFKA_PRODUCE_RATE_DISABLE_STATS_ON_HOLIDAYS_KEY, false);
+ workUnitState.setProp(FlushingExtractor.FLUSH_INTERVAL_SECONDS_KEY, 1L);
+ this.tracker = new KafkaProduceRateTracker(workUnitState, kafkaPartitions, watermarkTracker, extractorStatsTracker, currentTime);
+
+ //Bootstrap the extractorStatsTracker
+ writeProduceRateToKafkaWatermarksHelper(readStartTime, decodeStartTime, currentTime);
+
+ for (int i = 1; i < KafkaProduceRateTracker.SLIDING_WINDOW_SIZE + 1; i++) {
+ //Add more records and update watermarks/stats
+ writeProduceRateToKafkaWatermarksHelper(readStartTime + 1000 + i, decodeStartTime + 1000 + i,
+ currentTime + i);
+ }
+
+ //Ensure kafka watermark is non-null and is > 0 for the hour-of-day and day-of-week corresponding to currentTime
+ assertKafkaWatermarks(currentTime + KafkaProduceRateTracker.SLIDING_WINDOW_SIZE);
+ }
+
+ @Test (dependsOnMethods = "testWriteProduceRateToKafkaWatermarks")
+ public void testWriteProduceRateToKafkaWatermarksWithHolidays() {
+ long readStartTime = TimeUnit.MILLISECONDS.toNanos(HOLIDAY_TIME);
+ long decodeStartTime = readStartTime + 1;
+ Long currentTime = HOLIDAY_TIME + 10;
+
+ WorkUnitState workUnitState = new WorkUnitState();
+ workUnitState.setProp(KafkaProduceRateTracker.KAFKA_PRODUCE_RATE_DISABLE_STATS_ON_HOLIDAYS_KEY, true);
+ workUnitState.setProp(FlushingExtractor.FLUSH_INTERVAL_SECONDS_KEY, 1L);
+ this.tracker = new KafkaProduceRateTracker(workUnitState, kafkaPartitions, watermarkTracker, extractorStatsTracker, currentTime);
+
+ //Bootstrap the extractorStatsTracker; Holiday stats collection disabled.
+ writeProduceRateToKafkaWatermarksHelper(readStartTime, decodeStartTime, currentTime);
+
+ //Add a more records and update watermarks/stats
+ writeProduceRateToKafkaWatermarksHelper(readStartTime + 1000, decodeStartTime + 1000, currentTime + 1);
+
+ //Since stats collection is disabled on holidays, ensure watermarks are null.
+ assertKafkaWatermarks(currentTime);
+
+ readStartTime = TimeUnit.MILLISECONDS.toNanos(NON_HOLIDAY_TIME);
+ decodeStartTime = readStartTime + 1;
+ currentTime = NON_HOLIDAY_TIME + 10;
+
+ //Bootstrap the extractorStatsTracker with initial records
+ writeProduceRateToKafkaWatermarksHelper(readStartTime, decodeStartTime, currentTime);
+
+ for (int i = 1; i < KafkaProduceRateTracker.SLIDING_WINDOW_SIZE + 1; i++) {
+ //Add more records and update watermarks/stats
+ writeProduceRateToKafkaWatermarksHelper(readStartTime + 1000 + i, decodeStartTime + 1000 + i,
+ currentTime + i);
+ }
+ //Ensure kafka watermark is not null and is > 0 for the hour-of-day and day-of-week corresponding to currentTime
+ assertKafkaWatermarks(currentTime + KafkaProduceRateTracker.SLIDING_WINDOW_SIZE);
+ }
+
+ @Test
+ public void testIsHoliday() {
+ WorkUnitState workUnitState = new WorkUnitState();
+ workUnitState.setProp(KafkaProduceRateTracker.KAFKA_PRODUCE_RATE_DISABLE_STATS_ON_HOLIDAYS_KEY, true);
+ KafkaExtractorStatsTracker extractorStatsTracker = new KafkaExtractorStatsTracker(this.workUnitState, kafkaPartitions);
+ KafkaProduceRateTracker tracker = new KafkaProduceRateTracker(workUnitState, kafkaPartitions, watermarkTracker, extractorStatsTracker);
+ Assert.assertTrue(tracker.isHoliday(HOLIDAY_DATE));
+ //Ensure that the caching behavior is correct
+ Assert.assertTrue(tracker.isHoliday(HOLIDAY_DATE));
+ Assert.assertFalse(tracker.isHoliday(NON_HOLIDAY_DATE));
+ }
+
+ @Test
+ public void testGetPenultimateElement() {
+ EvictingQueue<Double> queue = EvictingQueue.create(3);
+
+ queue.add(1.0);
+ queue.add(2.0);
+ queue.add(3.0);
+
+ Double element = KafkaProduceRateTracker.getPenultimateElement(queue);
+ Assert.assertEquals(element, 2.0);
+
+ queue.add(4.0);
+ element = KafkaProduceRateTracker.getPenultimateElement(queue);
+ Assert.assertEquals(element, 3.0);
+
+ queue.add(5.0);
+ element = KafkaProduceRateTracker.getPenultimateElement(queue);
+ Assert.assertEquals(element, 4.0);
+ }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamTestUtils.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamTestUtils.java
new file mode 100644
index 0000000..731c0b1
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamTestUtils.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+import java.util.regex.Pattern;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.mockito.Mockito;
+
+import com.codahale.metrics.Metric;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.kafka.client.AbstractBaseKafkaConsumerClient;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
+import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static org.mockito.Mockito.mock;
+
+
+/**
+ * A bunch of basic mocking class that can be used for different implementation to mock kafka clients.
+ */
+public class KafkaStreamTestUtils {
+ public static class MockKafkaConsumerClientFactory
+ implements GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory {
+ static final AbstractBaseKafkaConsumerClient MOCKED_KAFKA_CLIENT = mock(AbstractBaseKafkaConsumerClient.class);
+
+ @Override
+ public GobblinKafkaConsumerClient create(Config config) {
+ return MOCKED_KAFKA_CLIENT;
+ }
+ }
+
+ public static class MockKafka10ConsumerClientFactory
+ implements GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory {
+ @Override
+ public GobblinKafkaConsumerClient create(Config config) {
+ return new MockKafkaConsumerClient(config);
+ }
+ }
+
+ /**
+ * A mock implementation of {@link GobblinKafkaConsumerClient} that returns a {@link MockIterator} on
+ * invocation of the {@link MockKafkaConsumerClient#consume()} method.
+ */
+ public static class MockKafkaConsumerClient implements GobblinKafkaConsumerClient {
+ public static final String NUM_PARTITIONS_ASSIGNED = "gobblin.kafka.streaming.numPartitions";
+
+ private final Map<KafkaPartition, Long> latestOffsets = Maps.newHashMap();
+ private final Random random = new Random();
+ private final String topicName;
+ private final List<Integer> partitionIds;
+
+ protected MockKafkaConsumerClient(Config baseConfig) {
+ this.topicName = baseConfig.getString(KafkaSource.TOPIC_NAME);
+ int numPartitionsAssigned = ConfigUtils.getInt(baseConfig, NUM_PARTITIONS_ASSIGNED, 0);
+ this.partitionIds = getPartitionIds(baseConfig, numPartitionsAssigned);
+ }
+
+ public MockKafkaConsumerClient() {
+ topicName = "";
+ partitionIds = Lists.newArrayList();
+ }
+
+ private List<Integer> getPartitionIds(Config baseConfig, int numPartitionsAssigned) {
+ List<Integer> partitionIds = Lists.newArrayList();
+ for (int i = 0; i < numPartitionsAssigned; i++) {
+ String partitionIdProp = KafkaSource.PARTITION_ID + "." + i;
+ partitionIds.add(baseConfig.getInt(partitionIdProp));
+ }
+ return partitionIds;
+ }
+
+ /**
+ *
+ * @return a {@link MockIterator} over {@link KafkaConsumerRecord}s.
+ */
+ @Override
+ public Iterator<KafkaConsumerRecord> consume() {
+ return new MockIterator(this.topicName, this.partitionIds);
+ }
+
+ @Override
+ public void assignAndSeek(List<KafkaPartition> topicPartitions,
+ Map<KafkaPartition, LongWatermark> topicWatermarks) {
+ return;
+ }
+
+ @Override
+ public List<KafkaTopic> getFilteredTopics(List<Pattern> blacklist, List<Pattern> whitelist) {
+ return null;
+ }
+
+ @Override
+ public long getEarliestOffset(KafkaPartition partition)
+ throws KafkaOffsetRetrievalFailureException {
+ return 0;
+ }
+
+ @Override
+ public long getLatestOffset(KafkaPartition partition)
+ throws KafkaOffsetRetrievalFailureException {
+ return 0;
+ }
+
+ /**
+ * Returns a random offset for each {@link KafkaPartition}. The method ensures that the offsets are monotonically
+ * increasing for each {@link KafkaPartition} i.e. each subsequent call to the method will return a higher offset
+ * for every partition in the partition list.
+ * @param partitions
+ * @return
+ */
+ @Override
+ public Map<KafkaPartition, Long> getLatestOffsets(Collection<KafkaPartition> partitions) {
+ for (KafkaPartition partition : partitions) {
+ if (this.latestOffsets.containsKey(partition)) {
+ this.latestOffsets.put(partition, this.latestOffsets.get(partition) + 100);
+ } else {
+ this.latestOffsets.put(partition, new Long(random.nextInt(100000)));
+ }
+ }
+ return this.latestOffsets;
+ }
+
+ @Override
+ public Iterator<KafkaConsumerRecord> consume(KafkaPartition partition, long nextOffset, long maxOffset) {
+ return null;
+ }
+
+ @Override
+ public Map<String, Metric> getMetrics() {
+ return new HashMap<>();
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+
+ }
+ }
+
+ public static class MockSchemaRegistry extends KafkaSchemaRegistry<String, Schema> {
+ static Schema latestSchema = Schema.create(Schema.Type.STRING);
+
+ public MockSchemaRegistry(Properties props) {
+ super(props);
+ }
+
+ @Override
+ protected Schema fetchSchemaByKey(String key) throws SchemaRegistryException {
+ return null;
+ }
+
+ @Override
+ public Schema getLatestSchemaByTopic(String topic) throws SchemaRegistryException {
+ return latestSchema;
+ }
+
+ @Override
+ public String register(Schema schema) throws SchemaRegistryException {
+ return null;
+ }
+
+ @Override
+ public String register(Schema schema, String name) throws SchemaRegistryException {
+ this.latestSchema = schema;
+ return schema.toString();
+ }
+ }
+
+ public static class LowLevelMockSchemaRegistry
+ implements org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistry<String, Schema> {
+ private Schema latestSchema;
+
+ public LowLevelMockSchemaRegistry(Properties props) {
+ }
+
+ @Override
+ public String register(String name, Schema schema)
+ throws org.apache.gobblin.kafka.schemareg.SchemaRegistryException {
+ this.latestSchema = schema;
+ return schema.toString();
+ }
+
+ @Override
+ public Schema getById(String id) throws IOException, org.apache.gobblin.kafka.schemareg.SchemaRegistryException {
+ return null;
+ }
+
+ @Override
+ public Schema getLatestSchema(String name)
+ throws IOException, org.apache.gobblin.kafka.schemareg.SchemaRegistryException {
+ return this.latestSchema;
+ }
+
+ @Override
+ public boolean hasInternalCache() {
+ return false;
+ }
+ }
+
+ /**
+ * A mock iterator of {@link KafkaConsumerRecord}s. The caller provides a topicName, a list of partition ids and
+ * optionally, the number of records the iterator must iterate over. On each call to next(), the iterator returns
+ * a mock {@link KafkaConsumerRecord}, with a partition id assigned in a round-robin fashion over the input list of
+ * partition ids.
+ */
+ public static class MockIterator implements Iterator<KafkaConsumerRecord> {
+ //Schema for LiKafka10ConsumerRecords. TODO: Enhance the iterator to return random records
+ // according to a given schema.
+ private static final String SCHEMA =
+ "{\"namespace\": \"example.avro\",\n" + " \"type\": \"record\",\n" + " \"name\": \"user\",\n"
+ + " \"fields\": [\n" + " {\"name\": \"name\", \"type\": \"string\"},\n"
+ + " {\"name\": \"DUMMY\", \"type\": [\"null\",\"string\"]}\n" + " ]\n" + "}";
+
+ private final Schema schema = new Schema.Parser().parse(SCHEMA);
+ private final Random random = new Random();
+ private final String topicName;
+ private final long maxNumRecords;
+ private final List<Integer> partitionIds;
+ private final long[] nextOffsets;
+ private long numRecordsReturnedSoFar;
+ private int partitionIdx = 0;
+
+ public MockIterator(String topicName, List<Integer> partitionIds) {
+ this(topicName, partitionIds, Long.MAX_VALUE);
+ }
+
+ public MockIterator(String topicName, List<Integer> partitionIds, long numRecords) {
+ this.topicName = topicName;
+ this.maxNumRecords = numRecords;
+ this.partitionIds = partitionIds;
+ this.nextOffsets = new long[partitionIds.size()];
+ }
+
+ /**
+ * Returns {@code true} if the iteration has more elements.
+ * (In other words, returns {@code true} if {@link #next} would
+ * return an element rather than throwing an exception.)
+ *
+ * @return {@code true} if the iteration has more elements
+ */
+ @Override
+ public boolean hasNext() {
+ return this.numRecordsReturnedSoFar < this.maxNumRecords;
+ }
+
+ /**
+ * Returns the next element in the iteration.
+ *
+ * @return the next element in the iteration
+ * @throws java.util.NoSuchElementException if the iteration has no more elements
+ */
+ @Override
+ public KafkaConsumerRecord next() {
+ this.numRecordsReturnedSoFar++;
+ return getMockConsumerRecord(this.numRecordsReturnedSoFar);
+ }
+
+ private KafkaConsumerRecord getMockConsumerRecord(long numRecordsReturnedSoFar) {
+ DecodeableKafkaRecord mockRecord = Mockito.mock(DecodeableKafkaRecord.class);
+ Mockito.when(mockRecord.getValue()).thenReturn(getRecord());
+ Mockito.when(mockRecord.getTopic()).thenReturn(topicName);
+ Mockito.when(mockRecord.getPartition()).thenReturn(this.partitionIds.get(partitionIdx));
+ this.partitionIdx = (this.partitionIdx + 1) % this.partitionIds.size();
+ //Increment the next offset of the record
+ this.nextOffsets[partitionIdx]++;
+ Mockito.when(mockRecord.getNextOffset()).thenReturn(this.nextOffsets[partitionIdx]);
+ return mockRecord;
+ }
+
+ private GenericRecord getRecord() {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("name", UUID.randomUUID());
+ return record;
+ }
+ }
+
+
+}
+
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
new file mode 100644
index 0000000..1cf7ff2
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.publisher.DataPublisher;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.extract.FlushingExtractor;
+
+
+public class KafkaStreamingExtractorTest {
+ private KafkaStreamingExtractor streamingExtractor;
+ private final int numPartitions = 3;
+
+ @BeforeClass
+ public void setUp() {
+ WorkUnitState state = KafkaExtractorUtils.getWorkUnitState("testTopic", numPartitions);
+ state.setProp(FlushingExtractor.FLUSH_DATA_PUBLISHER_CLASS, TestDataPublisher.class.getName());
+ this.streamingExtractor = new KafkaStreamingExtractor(state);
+ }
+
+ @Test
+ public void testResetExtractorStats()
+ throws IOException, DataRecordException {
+ MultiLongWatermark highWatermark1 = new MultiLongWatermark(this.streamingExtractor.highWatermark);
+
+ //Read 2 records
+ this.streamingExtractor.readStreamEntityImpl();
+ Assert.assertEquals( this.streamingExtractor.nextWatermark.get(0), 1L);
+ Assert.assertEquals( this.streamingExtractor.nextWatermark.get(1), 0L);
+ Assert.assertEquals( this.streamingExtractor.nextWatermark.get(2), 0L);
+
+ this.streamingExtractor.readStreamEntityImpl();
+ Assert.assertEquals( this.streamingExtractor.nextWatermark.get(0), 1L);
+ Assert.assertEquals( this.streamingExtractor.nextWatermark.get(1), 1L);
+ Assert.assertEquals( this.streamingExtractor.nextWatermark.get(2), 0L);
+
+ //Checkpoint watermarks
+ this.streamingExtractor.onFlushAck();
+
+ //Reset extractor stats
+ this.streamingExtractor.resetExtractorStatsAndWatermarks(false);
+
+ //Ensure post-reset invariance is satisfied i.e. low watermark and next watermark are identical.
+ testAfterReset(highWatermark1);
+
+ MultiLongWatermark highWatermark2 = new MultiLongWatermark(this.streamingExtractor.highWatermark);
+ //Read 1 more record
+ this.streamingExtractor.readStreamEntityImpl();
+ Assert.assertEquals( this.streamingExtractor.nextWatermark.get(0), 1L);
+ Assert.assertEquals( this.streamingExtractor.nextWatermark.get(1), 1L);
+ Assert.assertEquals( this.streamingExtractor.nextWatermark.get(2), 1L);
+
+ Assert.assertEquals( this.streamingExtractor.lowWatermark.get(0), 1L);
+ Assert.assertEquals( this.streamingExtractor.lowWatermark.get(1), 1L);
+ Assert.assertEquals( this.streamingExtractor.lowWatermark.get(2), 0L);
+
+ //Checkpoint watermarks
+ this.streamingExtractor.onFlushAck();
+
+ //Reset extractor stats
+ this.streamingExtractor.resetExtractorStatsAndWatermarks(false);
+
+ //Ensure post-reset invariance is satisfied.
+ testAfterReset(highWatermark2);
+ }
+
+ private void testAfterReset(MultiLongWatermark previousHighWatermark) {
+ //Ensure that low and next watermarks are identical after reset. Also ensure the new high watermark is greater than
+ // the previous high watermark.
+ for (int i=0; i < numPartitions; i++) {
+ Assert.assertEquals(this.streamingExtractor.lowWatermark.get(i), this.streamingExtractor.nextWatermark.get(i));
+ Assert.assertTrue(previousHighWatermark.get(i) <= this.streamingExtractor.highWatermark.get(i));
+ }
+ }
+
+ @Test
+ public void testGenerateAdditionalTagHelper() throws Exception {
+ // Verifying that produce rate has been added.
+ Map<KafkaPartition, Map<String, String>> result = this.streamingExtractor.getAdditionalTagsHelper();
+ for (Map<String, String> entry: result.values()) {
+ Assert.assertTrue(entry.containsKey(KafkaProduceRateTracker.KAFKA_PARTITION_PRODUCE_RATE_KEY));
+ }
+ }
+
+ static class TestDataPublisher extends DataPublisher {
+ public TestDataPublisher(WorkUnitState state) {
+ super(state);
+ }
+
+ @Override
+ public void initialize() {
+ }
+
+ @Override
+ public void publishData(Collection<? extends WorkUnitState> states) {
+
+ }
+
+ @Override
+ public void publishMetadata(Collection<? extends WorkUnitState> states) {
+
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+
+ }
+ }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java
new file mode 100644
index 0000000..eb676d4
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils;
+import org.apache.gobblin.source.extractor.extract.kafka.UniversalKafkaSource;
+import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+public class KafkaTopicGroupingWorkUnitPackerTest {
+ private Properties props;
+
+ @BeforeMethod
+ public void setUp() {
+ props = new Properties();
+ props.setProperty("gobblin.kafka.streaming.containerCapacity", "2");
+ props.setProperty("kafka.workunit.size.estimator.type", "CUSTOM");
+ props.setProperty("kafka.workunit.size.estimator.customizedType",
+ "org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.UnitKafkaWorkUnitSizeEstimator");
+ }
+
+ /**
+ * Check that capacity is honored.
+ */
+ @Test
+ public void testSingleTopic() {
+ KafkaSource source = new UniversalKafkaSource();
+ SourceState state = new SourceState(new State(props));
+ state.setProp("gobblin.kafka.streaming.enableIndexing", false);
+ state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, Files.createTempDir().getAbsolutePath());
+
+ Map<String, List<WorkUnit>> workUnitsByTopic = ImmutableMap.of("topic1", Lists
+ .newArrayList(getWorkUnitWithTopicPartition("topic1", 1), getWorkUnitWithTopicPartition("topic1", 2),
+ getWorkUnitWithTopicPartition("topic1", 3)));
+
+ List<WorkUnit> workUnits = new KafkaTopicGroupingWorkUnitPacker(source, state, Optional.absent()).pack(workUnitsByTopic, 10);
+ Assert.assertEquals(workUnits.size(), 2);
+ Assert.assertEquals(workUnits.get(0).getProp(KafkaSource.TOPIC_NAME), "topic1");
+ Assert.assertEquals(workUnits.get(0).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, 0)), 1);
+ Assert.assertEquals(workUnits.get(0).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, 1)), 2);
+ Assert.assertEquals(workUnits.get(1).getProp(KafkaSource.TOPIC_NAME), "topic1");
+ Assert.assertEquals(workUnits.get(1).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, 0)), 3);
+ }
+
+ /**
+ * Check that topics are kept in separate work units.
+ */
+ @Test
+ public void testMultiTopic() {
+ KafkaSource source = new UniversalKafkaSource();
+ SourceState state = new SourceState(new State(props));
+ state.setProp("gobblin.kafka.streaming.enableIndexing", false);
+ state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, Files.createTempDir().getAbsolutePath());
+
+ Map<String, List<WorkUnit>> workUnitsByTopic = ImmutableMap.of("topic1", Lists
+ .newArrayList(getWorkUnitWithTopicPartition("topic1", 1), getWorkUnitWithTopicPartition("topic1", 2),
+ getWorkUnitWithTopicPartition("topic1", 3)), "topic2", Lists
+ .newArrayList(getWorkUnitWithTopicPartition("topic2", 1), getWorkUnitWithTopicPartition("topic2", 2),
+ getWorkUnitWithTopicPartition("topic2", 3)));
+
+ List<WorkUnit> workUnits = new KafkaTopicGroupingWorkUnitPacker(source, state, Optional.absent()).pack(workUnitsByTopic, 10);
+ Assert.assertEquals(workUnits.size(), 4);
+
+ Assert.assertEquals(workUnits.get(0).getProp(KafkaSource.TOPIC_NAME), "topic1");
+ Assert.assertEquals(workUnits.get(0).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, 0)), 1);
+ Assert.assertEquals(workUnits.get(0).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, 1)), 2);
+ Assert.assertEquals(workUnits.get(1).getProp(KafkaSource.TOPIC_NAME), "topic1");
+ Assert.assertEquals(workUnits.get(1).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, 0)), 3);
+
+ Assert.assertEquals(workUnits.get(2).getProp(KafkaSource.TOPIC_NAME), "topic2");
+ Assert.assertEquals(workUnits.get(2).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, 0)), 1);
+ Assert.assertEquals(workUnits.get(2).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, 1)), 2);
+ Assert.assertEquals(workUnits.get(3).getProp(KafkaSource.TOPIC_NAME), "topic2");
+ Assert.assertEquals(workUnits.get(3).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, 0)), 3);
+ }
+
+
+ public WorkUnit getWorkUnitWithTopicPartition(String topic, int partition) {
+ WorkUnit workUnit = new WorkUnit(new Extract(Extract.TableType.APPEND_ONLY, "kafka", topic));
+ workUnit.setProp(KafkaSource.TOPIC_NAME, topic);
+ workUnit.setProp(KafkaSource.PARTITION_ID, Integer.toString(partition));
+ workUnit.setProp(KafkaSource.LEADER_HOSTANDPORT, "host:1234");
+ workUnit.setProp(KafkaSource.LEADER_ID, "1");
+
+ workUnit.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, "0");
+ workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, "0");
+
+ workUnit.setProp("previousStartFetchEpochTime", "0");
+ workUnit.setProp("previousStopFetchEpochTime", "0");
+ workUnit.setProp("previousLowWatermark", "0");
+ workUnit.setProp("previousHighWatermark", "0");
+ workUnit.setProp("previousLatestOffset", "0");
+ workUnit.setProp("offsetFetchEpochTime", "0");
+ workUnit.setProp("previousOffsetFetchEpochTime", "0");
+
+ return workUnit;
+ }
+
+ @Test
+ public void testGetContainerCapacityForTopic() {
+ double delta = 0.000001; //Error tolerance limit for assertions involving double.
+ KafkaTopicGroupingWorkUnitPacker.ContainerCapacityComputationStrategy strategy =
+ KafkaTopicGroupingWorkUnitPacker.ContainerCapacityComputationStrategy.MIN;
+ List<Double> capacities = Arrays.asList(new Double[]{1.2, 1.4, 1.3, 1.4, 1.2});
+ double capacity = KafkaTopicGroupingWorkUnitPacker.getContainerCapacityForTopic(capacities, strategy);
+ Assert.assertEquals(capacity, 1.2, delta);
+ strategy = KafkaTopicGroupingWorkUnitPacker.ContainerCapacityComputationStrategy.MAX;
+ capacity = KafkaTopicGroupingWorkUnitPacker.getContainerCapacityForTopic(capacities, strategy);
+ Assert.assertEquals(capacity, 1.4, delta);
+ strategy = KafkaTopicGroupingWorkUnitPacker.ContainerCapacityComputationStrategy.MEDIAN;
+ capacity = KafkaTopicGroupingWorkUnitPacker.getContainerCapacityForTopic(capacities, strategy);
+ Assert.assertEquals(capacity, 1.3, delta);
+ strategy = KafkaTopicGroupingWorkUnitPacker.ContainerCapacityComputationStrategy.MEAN;
+ capacity = KafkaTopicGroupingWorkUnitPacker.getContainerCapacityForTopic(capacities, strategy);
+ Assert.assertEquals(capacity, 1.3, delta);
+
+ //Validate the median for an even sized list
+ capacities = Arrays.asList(new Double[]{1.2, 1.4, 1.3, 1.4});
+ strategy = KafkaTopicGroupingWorkUnitPacker.ContainerCapacityComputationStrategy.MEDIAN;
+ capacity = KafkaTopicGroupingWorkUnitPacker.getContainerCapacityForTopic(capacities, strategy);
+ Assert.assertEquals(capacity, 1.35, delta);
+ }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimatorTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimatorTest.java
new file mode 100644
index 0000000..d199295
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimatorTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.TimeZone;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.gson.Gson;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.io.GsonInterfaceAdapter;
+
+
+public class ProduceRateAndLagBasedWorkUnitSizeEstimatorTest {
+ private static final Gson GSON = GsonInterfaceAdapter.getGson(Object.class);
+ private static final String TEST_TOPIC = "test";
+ private static final long AVG_RECORD_SIZE = 1024L;
+ private static final String BINPACKING_TIME_1 = "11/17/2019 23:10:00";
+ private static final String BINPACKING_TIME_2 = "11/19/2019 08:00:00";
+
+ private double[][] avgProduceRates = new double[7][24];
+ private ProduceRateAndLagBasedWorkUnitSizeEstimator estimator;
+
+ @BeforeClass
+ public void setUp() {
+ double rate = 1.0;
+ for (int i = 0; i < 7; i++) {
+ for (int j = 0; j < 24; j++) {
+ if (i == 2) {
+ avgProduceRates[i][j] = -1.0;
+ } else {
+ avgProduceRates[i][j] = rate * ProduceRateAndLagBasedWorkUnitSizeEstimator.ONE_MEGA_BYTE;
+ rate++;
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testCalcEstimatedSize() throws ParseException {
+ SourceState sourceState = new SourceState();
+ sourceState.setProp(ProduceRateAndLagBasedWorkUnitSizeEstimator.CATCHUP_SLA_IN_HOURS_KEY, 3);
+ sourceState.setProp(ProduceRateAndLagBasedWorkUnitSizeEstimator.REPLANNING_INTERVAL_IN_HOURS_KEY, 3);
+ sourceState.setProp(ProduceRateAndLagBasedWorkUnitSizeEstimator.PRODUCE_RATE_SCALING_FACTOR_KEY, 1);
+ this.estimator = new ProduceRateAndLagBasedWorkUnitSizeEstimator(sourceState);
+ SimpleDateFormat format = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss");
+ format.setTimeZone(TimeZone.getDefault());
+
+ //WorkUnit with no KafkaWatermark
+ KafkaStreamingExtractor.KafkaWatermark watermark = null;
+ WorkUnit workUnit = WorkUnit.createEmpty();
+ workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.PARTITION_WATERMARK, GSON.toJson(watermark));
+ workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.DEFAULT_WORKUNIT_SIZE_KEY, 1.0);
+ workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, Long.toString(6 * 3600 * 1024));
+ workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.PACKING_START_TIME_MILLIS, format.parse(BINPACKING_TIME_1).getTime());
+ Assert.assertEquals(new Double(this.estimator.calcEstimatedSize(workUnit)).longValue(), 1L);
+
+ //WorkUnit with Kafka watermark and previous avg produce rates
+ watermark = new KafkaStreamingExtractor.KafkaWatermark(new KafkaPartition.Builder().withTopicName(TEST_TOPIC).withId(0).build(), new LongWatermark(0L));
+ watermark.setAvgRecordSize(AVG_RECORD_SIZE);
+ watermark.setAvgProduceRates(avgProduceRates);
+ workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.PARTITION_WATERMARK, GSON.toJson(watermark));
+ Assert.assertEquals(new Double(this.estimator.calcEstimatedSize(workUnit)).longValue(), 29L);
+
+ //WorkUnit with Kafka watermark but no previous avg produce rates
+ workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.PACKING_START_TIME_MILLIS, format.parse(BINPACKING_TIME_2).getTime());
+ workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.DEFAULT_WORKUNIT_SIZE_KEY, 2.0);
+ Assert.assertEquals(new Double(this.estimator.calcEstimatedSize(workUnit)).longValue(), 4L);
+ }
+}
\ No newline at end of file
diff --git a/gobblin-runtime/build.gradle b/gobblin-runtime/build.gradle
index e6d1785..13c883e 100644
--- a/gobblin-runtime/build.gradle
+++ b/gobblin-runtime/build.gradle
@@ -38,6 +38,7 @@ compileJava {
dependencies {
compile project(":gobblin-api")
compile project(":gobblin-core")
+ compile project(":gobblin-core-base")
compile project(":gobblin-hive-registration")
compile project(":gobblin-metrics-libs:gobblin-metrics")
compile project(":gobblin-metastore")
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index a9c0ac4..b7c87d1 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -78,6 +78,7 @@ ext.externalDependency = [
"jgit": "org.eclipse.jgit:org.eclipse.jgit:5.1.1.201809181055-r",
"jmh": "org.openjdk.jmh:jmh-core:1.17.3",
"jmhAnnotations": "org.openjdk.jmh:jmh-generator-annprocess:1.17.3",
+ "jollyday": "de.jollyday:jollyday:0.4.9",
"kafka08": "org.apache.kafka:kafka_2.11:" + kafka08Version,
"kafka08Test": "org.apache.kafka:kafka_2.11:" + kafka08Version + ":test",
"kafka08Client": "org.apache.kafka:kafka-clients:" + kafka08Version,
@@ -174,13 +175,13 @@ ext.externalDependency = [
/**
* Avoiding conflicts with Hive 1.x versions existed in the classpath
*/
- "orcMapreduce":"org.apache.orc:orc-mapreduce:1.6.3:nohive",
- "orcCore": "org.apache.orc:orc-core:1.6.3:nohive",
- "orcTools":"org.apache.orc:orc-tools:1.6.3",
- 'parquet': 'org.apache.parquet:parquet-hadoop:1.11.0',
- 'parquetAvro': 'org.apache.parquet:parquet-avro:1.11.0',
- 'parquetProto': 'org.apache.parquet:parquet-protobuf:1.11.0',
- 'parquetHadoop': 'org.apache.parquet:parquet-hadoop-bundle:1.11.0',
+ "orcMapreduce":"org.apache.orc:orc-mapreduce:1.6.5:nohive",
+ "orcCore": "org.apache.orc:orc-core:1.6.5:nohive",
+ "orcTools":"org.apache.orc:orc-tools:1.6.5",
+ 'parquet': 'org.apache.parquet:parquet-hadoop:1.11.1',
+ 'parquetAvro': 'org.apache.parquet:parquet-avro:1.11.1',
+ 'parquetProto': 'org.apache.parquet:parquet-protobuf:1.11.1',
+ 'parquetHadoop': 'org.apache.parquet:parquet-hadoop-bundle:1.11.1',
'reactivex': 'io.reactivex.rxjava2:rxjava:2.1.0',
"slf4j": [
"org.slf4j:slf4j-api:" + slf4jVersion,