You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/11 23:14:26 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1040] HighLevelConsumer re-design by removing references to …
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7a328f9 [GOBBLIN-1040] HighLevelConsumer re-design by removing references to …
7a328f9 is described below
commit 7a328f9a232a60973d27c50859e6b84e63df90f7
Author: vbohra <vb...@linkedin.com>
AuthorDate: Wed Mar 11 16:14:19 2020 -0700
[GOBBLIN-1040] HighLevelConsumer re-design by removing references to …
Closes #2900 from vikrambohra/GOBBLIN-1040
---
.../kafka/client/Kafka08ConsumerClient.java | 11 +-
.../gobblin/service/AvroJobSpecDeserializer.java | 0
.../gobblin/metrics/reporter/KafkaTestBase.java | 1 +
.../kafka/KafkaDeserializerExtractorTest.java | 2 +-
gobblin-modules/gobblin-kafka-09/build.gradle | 18 +-
.../kafka/client/Kafka09ConsumerClient.java | 99 ++++++++-
.../gobblin/service/AvroJobSpecDeserializer.java | 0
.../org/apache/gobblin/kafka/KafkaTestBase.java | 4 +
.../gobblin/runtime/HighLevelConsumerTest.java | 176 +++++++++++++++
.../gobblin/runtime}/KafkaAvroJobMonitorTest.java | 18 +-
.../runtime}/KafkaAvroJobStatusMonitorTest.java | 66 ++++--
.../gobblin/runtime/KafkaJobMonitorTest.java | 127 +++++++++++
.../runtime}/SLAEventKafkaJobMonitorTest.java | 41 +++-
.../service}/GobblinServiceManagerTest.java | 38 +++-
.../service/StreamingKafkaSpecExecutorTest.java | 30 ++-
.../kafka/client/BaseKafkaConsumerRecord.java | 12 +
....java => GobblinConsumerRebalanceListener.java} | 42 ++--
.../kafka/client/GobblinKafkaConsumerClient.java | 49 +++-
.../gobblin/kafka/client/KafkaConsumerRecord.java | 19 ++
.../kafka/writer/KafkaWriterConfigurationKeys.java | 4 +-
gobblin-runtime/build.gradle | 1 -
.../job_monitor/AvroJobSpecKafkaJobMonitor.java | 2 +
.../runtime/job_monitor/KafkaAvroJobMonitor.java | 3 +-
.../runtime/job_monitor/KafkaJobMonitor.java | 12 +-
.../gobblin/runtime/kafka/HighLevelConsumer.java | 246 +++++++++++++++------
.../runtime/job_monitor/KafkaJobMonitorTest.java | 65 ------
.../runtime/job_monitor/MockKafkaStream.java | 106 ---------
.../runtime/job_monitor/MockedKafkaJobMonitor.java | 28 +--
.../runtime/kafka/HighLevelConsumerTest.java | 82 -------
.../runtime/kafka/MockedHighLevelConsumer.java | 53 ++---
gobblin-service/build.gradle | 4 +-
.../service/monitoring/KafkaJobStatusMonitor.java | 10 +-
32 files changed, 895 insertions(+), 474 deletions(-)
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/client/Kafka08ConsumerClient.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/client/Kafka08ConsumerClient.java
index 2ed5b70..8d9efbc 100644
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/client/Kafka08ConsumerClient.java
+++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/client/Kafka08ConsumerClient.java
@@ -247,6 +247,11 @@ public class Kafka08ConsumerClient extends AbstractBaseKafkaConsumerClient {
}
}
+ @Override
+ public Iterator<KafkaConsumerRecord> consume() {
+ throw new UnsupportedOperationException("consume() not supported by " + this.getClass().getSimpleName() + " Please use Kafka09ConsumerClient or above");
+ }
+
private synchronized FetchResponse getFetchResponseForFetchRequest(FetchRequest fetchRequest, KafkaPartition partition) {
SimpleConsumer consumer = getSimpleConsumer(partition.getLeader().getHostAndPort());
@@ -265,7 +270,7 @@ public class Kafka08ConsumerClient extends AbstractBaseKafkaConsumerClient {
new Function<kafka.message.MessageAndOffset, KafkaConsumerRecord>() {
@Override
public KafkaConsumerRecord apply(kafka.message.MessageAndOffset input) {
- return new Kafka08ConsumerRecord(input);
+ return new Kafka08ConsumerRecord(input, partition.getTopicName(), partition.getId());
}
});
} catch (Exception e) {
@@ -345,8 +350,8 @@ public class Kafka08ConsumerClient extends AbstractBaseKafkaConsumerClient {
private final MessageAndOffset messageAndOffset;
- public Kafka08ConsumerRecord(MessageAndOffset messageAndOffset) {
- super(messageAndOffset.offset(), messageAndOffset.message().size());
+ public Kafka08ConsumerRecord(MessageAndOffset messageAndOffset, String topic, int partition) {
+ super(messageAndOffset.offset(), messageAndOffset.message().size(), topic, partition);
this.messageAndOffset = messageAndOffset;
}
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java
similarity index 100%
copy from gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java
copy to gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java
diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaTestBase.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaTestBase.java
index 11aa603..65040d0 100644
--- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaTestBase.java
+++ b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaTestBase.java
@@ -66,6 +66,7 @@ public class KafkaTestBase implements Closeable {
if (serverStarted && serverClosed) {
throw new RuntimeException("Kafka test server has already been closed. Cannot generate Kafka server twice.");
}
+
if (!serverStarted) {
serverStarted = true;
zkConnect = TestZKUtils.zookeeperConnect();
diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractorTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractorTest.java
index 1a273ec..06b06e0 100644
--- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractorTest.java
+++ b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractorTest.java
@@ -297,7 +297,7 @@ public class KafkaDeserializerExtractorTest {
Message mockMessage = mock(Message.class);
when(mockMessage.payload()).thenReturn(payload);
when(mockMessageAndOffset.message()).thenReturn(mockMessage);
- return new Kafka08ConsumerRecord(mockMessageAndOffset);
+ return new Kafka08ConsumerRecord(mockMessageAndOffset, "test", 0);
}
@AllArgsConstructor
diff --git a/gobblin-modules/gobblin-kafka-09/build.gradle b/gobblin-modules/gobblin-kafka-09/build.gradle
index 5978d96..b33eb55 100644
--- a/gobblin-modules/gobblin-kafka-09/build.gradle
+++ b/gobblin-modules/gobblin-kafka-09/build.gradle
@@ -18,6 +18,7 @@
apply plugin: 'java'
dependencies {
+ compile project(":gobblin-runtime")
compile project(":gobblin-modules:gobblin-kafka-common")
compile project(":gobblin-core-base")
compile project(":gobblin-utility")
@@ -49,7 +50,10 @@ dependencies {
runtime externalDependency.confluentSchemaRegistryClient
runtime externalDependency.protobuf
- testCompile project(":gobblin-runtime")
+ testCompile project(":gobblin-service")
+ testCompile project(":gobblin-modules:gobblin-service-kafka")
+ testCompile project(path: ":gobblin-runtime", configuration: "tests")
+ testCompile project(path: ":gobblin-metastore", configuration: "testFixtures")
testCompile project(":gobblin-test-utils")
testCompile externalDependency.jsonAssert
testCompile externalDependency.mockito
@@ -59,6 +63,8 @@ dependencies {
exclude group: "com.sun.jdmk", module: "jmxtools"
exclude group: "javax.jms", module: "jms"
}
+ testCompile externalDependency.pegasus.data
+ testCompile externalDependency.pegasus.restliClient
}
configurations {
@@ -68,6 +74,16 @@ configurations {
// HADOOP-5254 and MAPREDUCE-5664
all*.exclude group: 'xml-apis'
all*.exclude group: 'xerces'
+ tests
+}
+
+task testJar(type: Jar, dependsOn: testClasses) {
+ baseName = "test-${project.archivesBaseName}"
+ from sourceSets.test.output
+}
+
+artifacts {
+ tests testJar
}
test {
diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
index 97581c6..931cf3d 100644
--- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
+++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.kafka.client;
import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -24,11 +25,17 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
@@ -39,6 +46,7 @@ import com.codahale.metrics.Metric;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
@@ -48,6 +56,7 @@ import com.typesafe.config.ConfigFactory;
import javax.annotation.Nonnull;
import lombok.EqualsAndHashCode;
+import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
@@ -76,7 +85,7 @@ public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient
private static final String KAFKA_09_CLIENT_GROUP_ID = "group.id";
private static final String KAFKA_09_DEFAULT_ENABLE_AUTO_COMMIT = Boolean.toString(false);
- private static final String KAFKA_09_DEFAULT_KEY_DESERIALIZER =
+ public static final String KAFKA_09_DEFAULT_KEY_DESERIALIZER =
"org.apache.kafka.common.serialization.StringDeserializer";
private static final String KAFKA_09_DEFAULT_GROUP_ID = "kafka09";
@@ -160,12 +169,53 @@ public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient
this.consumer.assign(Lists.newArrayList(new TopicPartition(partition.getTopicName(), partition.getId())));
this.consumer.seek(new TopicPartition(partition.getTopicName(), partition.getId()), nextOffset);
- ConsumerRecords<K, V> consumerRecords = consumer.poll(super.fetchTimeoutMillis);
- return Iterators.transform(consumerRecords.iterator(), new Function<ConsumerRecord<K, V>, KafkaConsumerRecord>() {
+ return consume();
+ }
+
+ @Override
+ public Iterator<KafkaConsumerRecord> consume() {
+ try {
+ ConsumerRecords<K, V> consumerRecords = consumer.poll(super.fetchTimeoutMillis);
+
+ return Iterators.transform(consumerRecords.iterator(), input -> {
+ try {
+ return new Kafka09ConsumerRecord(input);
+ } catch (Throwable t) {
+ throw Throwables.propagate(t);
+ }
+ });
+ } catch (Exception e) {
+ log.error("Exception on polling records", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Subscribe to a kafka topic
+ * TODO Add multi topic support
+ * @param topic
+ */
+ @Override
+ public void subscribe(String topic) {
+ this.consumer.subscribe(Lists.newArrayList(topic), new NoOpConsumerRebalanceListener());
+ }
+ /**
+ * Subscribe to a kafka topic with a {#GobblinConsumerRebalanceListener}
+ * TODO Add multi topic support
+ * @param topic
+ */
+ @Override
+ public void subscribe(String topic, GobblinConsumerRebalanceListener listener) {
+ this.consumer.subscribe(Lists.newArrayList(topic), new ConsumerRebalanceListener() {
@Override
- public KafkaConsumerRecord apply(ConsumerRecord<K, V> input) {
- return new Kafka09ConsumerRecord<>(input);
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+ listener.onPartitionsRevoked(partitions.stream().map(a -> new KafkaPartition.Builder().withTopicName(a.topic()).withId(a.partition()).build()).collect(Collectors.toList()));
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+ listener.onPartitionsAssigned(partitions.stream().map(a -> new KafkaPartition.Builder().withTopicName(a.topic()).withId(a.partition()).build()).collect(Collectors.toList()));
}
});
}
@@ -181,6 +231,43 @@ public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient
}
/**
+ * Commit offsets to Kafka asynchronously
+ */
+ @Override
+ public void commitOffsetsAsync(Map<KafkaPartition, Long> partitionOffsets) {
+ Map<TopicPartition, OffsetAndMetadata> offsets = partitionOffsets.entrySet().stream().collect(Collectors.toMap(e -> new TopicPartition(e.getKey().getTopicName(),e.getKey().getId()), e -> new OffsetAndMetadata(e.getValue())));
+ consumer.commitAsync(offsets, new OffsetCommitCallback() {
+ @Override
+ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
+ if(exception != null) {
+ log.error("Exception while committing offsets " + offsets, exception);
+ return;
+ }
+ }
+ });
+ }
+
+ /**
+ * Commit offsets to Kafka synchronously
+ */
+ @Override
+ public void commitOffsetsSync(Map<KafkaPartition, Long> partitionOffsets) {
+ Map<TopicPartition, OffsetAndMetadata> offsets = partitionOffsets.entrySet().stream().collect(Collectors.toMap(e -> new TopicPartition(e.getKey().getTopicName(),e.getKey().getId()), e -> new OffsetAndMetadata(e.getValue())));
+ consumer.commitSync(offsets);
+ }
+
+ /**
+ * returns the last committed offset for a KafkaPartition
+ * @param partition
+ * @return last committed offset or -1 for invalid KafkaPartition
+ */
+ @Override
+ public long committed(KafkaPartition partition) {
+ OffsetAndMetadata offsetAndMetadata = consumer.committed(new TopicPartition(partition.getTopicName(), partition.getId()));
+ return offsetAndMetadata != null ? offsetAndMetadata.offset() : -1l;
+ }
+
+ /**
* Convert a {@link KafkaMetric} instance to a {@link Metric}.
* @param kafkaMetric
* @return
@@ -240,7 +327,7 @@ public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient
public Kafka09ConsumerRecord(ConsumerRecord<K, V> consumerRecord) {
// Kafka 09 consumerRecords do not provide value size.
// Only 08 and 10 versions provide them.
- super(consumerRecord.offset(), BaseKafkaConsumerRecord.VALUE_SIZE_UNAVAILABLE);
+ super(consumerRecord.offset(), BaseKafkaConsumerRecord.VALUE_SIZE_UNAVAILABLE, consumerRecord.topic(), consumerRecord.partition());
this.consumerRecord = consumerRecord;
}
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java
similarity index 100%
rename from gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java
rename to gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaTestBase.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaTestBase.java
index f99680c..2eb3fa7 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaTestBase.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaTestBase.java
@@ -265,5 +265,9 @@ public class KafkaTestBase implements Closeable {
public int getKafkaServerPort() {
return _kafkaServerSuite.getKafkaServerPort();
}
+
+ public String getZkConnectString() {
+ return this._kafkaServerSuite.getZkConnectString();
+ }
}
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
new file mode 100644
index 0000000..75c667b
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import java.io.File;
+import java.util.List;
+import java.util.Properties;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import com.google.api.client.util.Lists;
+import com.google.common.base.Optional;
+import com.google.common.io.Closer;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.KafkaTestBase;
+import org.apache.gobblin.kafka.client.AbstractBaseKafkaConsumerClient;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
+import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
+import org.apache.gobblin.kafka.writer.Kafka09DataWriter;
+import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys;
+import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
+import org.apache.gobblin.runtime.kafka.MockedHighLevelConsumer;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.AsyncDataWriter;
+import org.apache.gobblin.writer.WriteCallback;
+
+
+@Test
+public class HighLevelConsumerTest extends KafkaTestBase {
+
+ private static final String BOOTSTRAP_SERVERS_KEY = "bootstrap.servers";
+ private static final String KAFKA_AUTO_OFFSET_RESET_KEY = "auto.offset.reset";
+ private static final String SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT = AbstractBaseKafkaConsumerClient.CONFIG_NAMESPACE + "." + AbstractBaseKafkaConsumerClient.CONSUMER_CONFIG + ".";
+ private static final String TOPIC = HighLevelConsumerTest.class.getSimpleName();
+ private static final int NUM_PARTITIONS = 2;
+ private static final int NUM_MSGS = 10;
+
+ private Closer _closer;
+ private String _kafkaBrokers;
+ private AsyncDataWriter dataWriter;
+
+ public HighLevelConsumerTest()
+ throws InterruptedException, RuntimeException {
+ super();
+ _kafkaBrokers = "localhost:" + this.getKafkaServerPort();
+ }
+
+ @BeforeSuite
+ public void beforeSuite() throws Exception {
+ startServers();
+ _closer = Closer.create();
+ Properties producerProps = new Properties();
+ producerProps.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, TOPIC);
+ producerProps.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + BOOTSTRAP_SERVERS_KEY, _kafkaBrokers);
+ producerProps.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + KafkaWriterConfigurationKeys.VALUE_SERIALIZER_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerProps.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER, this.getZkConnectString());
+ producerProps.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT, String.valueOf(NUM_PARTITIONS));
+ dataWriter = _closer.register(new Kafka09DataWriter(producerProps));
+
+ List<byte[]> records = createByteArrayMessages(NUM_MSGS);
+ WriteCallback mock = Mockito.mock(WriteCallback.class);
+ for(byte[] record : records) {
+ dataWriter.write(record, mock);
+ }
+ dataWriter.flush();
+ }
+
+ public static Config getSimpleConfig(Optional<String> prefix) {
+ Properties properties = new Properties();
+ properties.put(getConfigKey(prefix, ConfigurationKeys.KAFKA_BROKERS), "localhost:1234");
+ properties.put(getConfigKey(prefix, Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY), Kafka09ConsumerClient.KAFKA_09_DEFAULT_KEY_DESERIALIZER);
+ properties.put(getConfigKey(prefix, "zookeeper.connect"), "zookeeper");
+ properties.put(ConfigurationKeys.STATE_STORE_ENABLED, "true");
+ File tmpDir = Files.createTempDir();
+ tmpDir.deleteOnExit();
+ properties.put(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, tmpDir.toString());
+
+ return ConfigFactory.parseProperties(properties);
+ }
+
+ private static String getConfigKey(Optional<String> prefix, String key) {
+ return prefix.isPresent() ? prefix.get() + "." + key : key;
+ }
+
+ @Test
+ public void testConsumerAutoOffsetCommit() throws Exception {
+ Properties consumerProps = new Properties();
+ consumerProps.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers);
+ consumerProps.setProperty(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + KAFKA_AUTO_OFFSET_RESET_KEY, "earliest");
+ consumerProps.setProperty(HighLevelConsumer.ENABLE_AUTO_COMMIT_KEY, "true");
+
+ MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(TOPIC, ConfigUtils.propertiesToConfig(consumerProps), NUM_PARTITIONS);
+ consumer.startAsync().awaitRunning();
+
+ consumer.awaitExactlyNMessages(NUM_MSGS, 5000);
+ consumer.shutDown();
+ }
+
+ @Test
+ public void testConsumerManualOffsetCommit() throws Exception {
+ Properties consumerProps = new Properties();
+ consumerProps.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers);
+ consumerProps.setProperty(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + KAFKA_AUTO_OFFSET_RESET_KEY, "earliest");
+
+ // Setting this to a second to make sure we are committing offsets frequently
+ consumerProps.put(HighLevelConsumer.OFFSET_COMMIT_TIME_THRESHOLD_SECS_KEY, 1);
+
+ MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(TOPIC, ConfigUtils.propertiesToConfig(consumerProps),
+ NUM_PARTITIONS);
+ consumer.startAsync().awaitRunning();
+
+ consumer.awaitExactlyNMessages(NUM_MSGS, 5000);
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ }
+
+ GobblinKafkaConsumerClient client = consumer.getGobblinKafkaConsumerClient();
+ for(int i=0; i< NUM_PARTITIONS; i++) {
+ KafkaPartition partition = new KafkaPartition.Builder().withTopicName(TOPIC).withId(i).build();
+ Assert.assertTrue(consumer.getCommittedOffsets().containsKey(partition));
+ }
+ consumer.shutDown();
+ }
+
+ private List<byte[]> createByteArrayMessages(int numMsgs) {
+ List<byte[]> records = Lists.newArrayList();
+
+ for(int i=0; i<numMsgs; i++) {
+ byte[] msg = ("msg_" + i).getBytes();
+ records.add(msg);
+ }
+ return records;
+ }
+
+ @AfterSuite
+ public void afterSuite() {
+ try {
+ _closer.close();
+ } catch (Exception e) {
+ System.out.println("Failed to close data writer." + e);
+ } finally {
+ try {
+ close();
+ } catch (Exception e) {
+ System.out.println("Failed to close Kafka server."+ e);
+ }
+ }
+ }
+}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaAvroJobMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobMonitorTest.java
similarity index 92%
rename from gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaAvroJobMonitorTest.java
rename to gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobMonitorTest.java
index 416e237..2e63cc2 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaAvroJobMonitorTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobMonitorTest.java
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-package org.apache.gobblin.runtime.job_monitor;
+package org.apache.gobblin.runtime;
+import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.List;
@@ -38,7 +39,8 @@ import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
import org.apache.gobblin.metrics.reporter.util.NoopSchemaVersionWriter;
import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.kafka.HighLevelConsumerTest;
+import org.apache.gobblin.runtime.job_monitor.KafkaAvroJobMonitor;
+import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor;
import org.apache.gobblin.util.Either;
@@ -134,6 +136,18 @@ public class KafkaAvroJobMonitorTest {
this.events.add(message);
return Lists.newArrayList(Either.<JobSpec, URI>left(JobSpec.builder(message.getName()).build()));
}
+
+ @Override
+ protected void buildMetricsContextAndMetrics() {
+ super.buildMetricsContextAndMetrics();
+ }
+
+ @Override
+ protected void shutdownMetrics()
+ throws IOException {
+ super.shutdownMetrics();
+ }
+
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
similarity index 84%
rename from gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
rename to gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index eb68cf4..05dc4c3 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gobblin.service.monitoring;
+package org.apache.gobblin.runtime;
import java.io.File;
import java.io.IOException;
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -43,6 +44,8 @@ import kafka.message.MessageAndMetadata;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.kafka.KafkaTestBase;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
@@ -52,6 +55,9 @@ import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
import org.apache.gobblin.metrics.kafka.KafkaKeyValueProducerPusher;
import org.apache.gobblin.metrics.kafka.Pusher;
import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor;
+import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
public class KafkaAvroJobStatusMonitorTest {
@@ -123,14 +129,16 @@ public class KafkaAvroJobStatusMonitorTest {
Thread.currentThread().interrupt();
}
- Config config = ConfigFactory.empty().withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
+ Config config = ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, ConfigValueFactory.fromAnyRef("localhost:0000"))
+ .withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+ .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
.withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"));
- KafkaJobStatusMonitor jobStatusMonitor = new KafkaAvroJobStatusMonitor("test",config, 1);
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor = new MockKafkaAvroJobStatusMonitor("test",config, 1);
ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
- jobStatusMonitor.processMessage(messageAndMetadata);
+ jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
StateStore stateStore = jobStatusMonitor.getStateStore();
String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
@@ -141,7 +149,7 @@ public class KafkaAvroJobStatusMonitorTest {
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPILED.name());
messageAndMetadata = iterator.next();
- jobStatusMonitor.processMessage(messageAndMetadata);
+ jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, this.jobGroup, this.jobName);
stateList = stateStore.getAll(storeName, tableName);
@@ -150,7 +158,7 @@ public class KafkaAvroJobStatusMonitorTest {
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
messageAndMetadata = iterator.next();
- jobStatusMonitor.processMessage(messageAndMetadata);
+ jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
stateList = stateStore.getAll(storeName, tableName);
Assert.assertEquals(stateList.size(), 1);
@@ -158,7 +166,7 @@ public class KafkaAvroJobStatusMonitorTest {
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.RUNNING.name());
messageAndMetadata = iterator.next();
- jobStatusMonitor.processMessage(messageAndMetadata);
+ jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
stateList = stateStore.getAll(storeName, tableName);
Assert.assertEquals(stateList.size(), 1);
@@ -170,12 +178,14 @@ public class KafkaAvroJobStatusMonitorTest {
// Check that state didn't get set to running since it was already complete
messageAndMetadata = iterator.next();
- jobStatusMonitor.processMessage(messageAndMetadata);
+ jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
stateList = stateStore.getAll(storeName, tableName);
Assert.assertEquals(stateList.size(), 1);
state = stateList.get(0);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPLETE.name());
+
+ jobStatusMonitor.shutDown();
}
@Test
@@ -222,14 +232,16 @@ public class KafkaAvroJobStatusMonitorTest {
Thread.currentThread().interrupt();
}
- Config config = ConfigFactory.empty().withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
+ Config config = ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, ConfigValueFactory.fromAnyRef("localhost:0000"))
+ .withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+ .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
.withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"));
- KafkaJobStatusMonitor jobStatusMonitor = new KafkaAvroJobStatusMonitor("test",config, 1);
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor = new MockKafkaAvroJobStatusMonitor("test",config, 1);
ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
- jobStatusMonitor.processMessage(messageAndMetadata);
+ jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
StateStore stateStore = jobStatusMonitor.getStateStore();
String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
@@ -240,7 +252,7 @@ public class KafkaAvroJobStatusMonitorTest {
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPILED.name());
messageAndMetadata = iterator.next();
- jobStatusMonitor.processMessage(messageAndMetadata);
+ jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, this.jobGroup, this.jobName);
stateList = stateStore.getAll(storeName, tableName);
@@ -249,7 +261,7 @@ public class KafkaAvroJobStatusMonitorTest {
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
messageAndMetadata = iterator.next();
- jobStatusMonitor.processMessage(messageAndMetadata);
+ jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
stateList = stateStore.getAll(storeName, tableName);
Assert.assertEquals(stateList.size(), 1);
@@ -257,7 +269,7 @@ public class KafkaAvroJobStatusMonitorTest {
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.RUNNING.name());
messageAndMetadata = iterator.next();
- jobStatusMonitor.processMessage(messageAndMetadata);
+ jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
stateList = stateStore.getAll(storeName, tableName);
Assert.assertEquals(stateList.size(), 1);
@@ -267,7 +279,7 @@ public class KafkaAvroJobStatusMonitorTest {
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD), Boolean.toString(true));
messageAndMetadata = iterator.next();
- jobStatusMonitor.processMessage(messageAndMetadata);
+ jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
stateList = stateStore.getAll(storeName, tableName);
Assert.assertEquals(stateList.size(), 1);
@@ -276,7 +288,7 @@ public class KafkaAvroJobStatusMonitorTest {
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
messageAndMetadata = iterator.next();
- jobStatusMonitor.processMessage(messageAndMetadata);
+ jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
stateList = stateStore.getAll(storeName, tableName);
Assert.assertEquals(stateList.size(), 1);
@@ -285,7 +297,7 @@ public class KafkaAvroJobStatusMonitorTest {
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.RUNNING.name());
messageAndMetadata = iterator.next();
- jobStatusMonitor.processMessage(messageAndMetadata);
+ jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
stateList = stateStore.getAll(storeName, tableName);
Assert.assertEquals(stateList.size(), 1);
@@ -293,6 +305,8 @@ public class KafkaAvroJobStatusMonitorTest {
//Because the maximum attempt is set to 2, so the state is set to Failed after trying twice
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.FAILED.name());
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD), Boolean.toString(false));
+
+ jobStatusMonitor.shutDown();
}
private GobblinTrackingEvent createFlowCompiledEvent() {
@@ -405,6 +419,11 @@ public class KafkaAvroJobStatusMonitorTest {
return event;
}
+ private DecodeableKafkaRecord convertMessageAndMetadataToDecodableKafkaRecord(MessageAndMetadata messageAndMetadata) {
+ ConsumerRecord consumerRecord = new ConsumerRecord<>(TOPIC, messageAndMetadata.partition(), messageAndMetadata.offset(), messageAndMetadata.key(), messageAndMetadata.message());
+ return new Kafka09ConsumerClient.Kafka09ConsumerRecord(consumerRecord);
+ }
+
private void cleanUpDir(String dir) throws Exception {
File specStoreDir = new File(dir);
if (specStoreDir.exists()) {
@@ -429,4 +448,17 @@ public class KafkaAvroJobStatusMonitorTest {
System.err.println("Failed to close Kafka server.");
}
}
+
+ class MockKafkaAvroJobStatusMonitor extends KafkaAvroJobStatusMonitor {
+
+ public MockKafkaAvroJobStatusMonitor(String topic, Config config, int numThreads)
+ throws IOException, ReflectiveOperationException {
+ super(topic, config, numThreads);
+ }
+
+ @Override
+ protected void processMessage(DecodeableKafkaRecord record) {
+ super.processMessage(record);
+ }
+ }
}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaJobMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaJobMonitorTest.java
new file mode 100644
index 0000000..f47bae4
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaJobMonitorTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import java.net.URI;
+import java.util.Properties;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import com.google.common.io.Closer;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.KafkaTestBase;
+import org.apache.gobblin.kafka.client.AbstractBaseKafkaConsumerClient;
+import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
+import org.apache.gobblin.kafka.writer.Kafka09DataWriter;
+import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys;
+import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor;
+import org.apache.gobblin.runtime.job_monitor.MockedKafkaJobMonitor;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.AsyncDataWriter;
+import org.apache.gobblin.writer.WriteCallback;
+
+
+public class KafkaJobMonitorTest extends KafkaTestBase {
+
+ private static final String BOOTSTRAP_SERVERS_KEY = "bootstrap.servers";
+ private static final String KAFKA_AUTO_OFFSET_RESET_KEY = "auto.offset.reset";
+ private static final String SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT = AbstractBaseKafkaConsumerClient.CONFIG_NAMESPACE + "." + AbstractBaseKafkaConsumerClient.CONSUMER_CONFIG + ".";
+ private static final String TOPIC = KafkaJobMonitorTest.class.getSimpleName();
+ private static final int NUM_PARTITIONS = 2;
+
+ private Closer _closer;
+ private String _kafkaBrokers;
+ private AsyncDataWriter dataWriter;
+
+ public KafkaJobMonitorTest()
+ throws InterruptedException, RuntimeException {
+ super();
+ _kafkaBrokers = "localhost:" + this.getKafkaServerPort();
+ }
+
+ @BeforeSuite
+ public void beforeSuite() throws Exception {
+ startServers();
+ _closer = Closer.create();
+ Properties producerProps = new Properties();
+ producerProps.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, TOPIC);
+ producerProps.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + BOOTSTRAP_SERVERS_KEY, _kafkaBrokers);
+ producerProps.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + KafkaWriterConfigurationKeys.VALUE_SERIALIZER_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerProps.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER, this.getZkConnectString());
+ producerProps.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT, String.valueOf(NUM_PARTITIONS));
+ dataWriter = _closer.register(new Kafka09DataWriter(producerProps));
+ }
+
+ @Test
+ public void test() throws Exception {
+
+ Properties consumerProps = new Properties();
+ consumerProps.put(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX + "." + ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers);
+ consumerProps.put(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX + "." + Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ consumerProps.setProperty(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX + "." + SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + KAFKA_AUTO_OFFSET_RESET_KEY, "earliest");
+
+ MockedKafkaJobMonitor monitor = MockedKafkaJobMonitor.create(TOPIC, ConfigUtils.propertiesToConfig(consumerProps));
+ monitor.startAsync().awaitRunning();
+
+ WriteCallback mockCallback = Mockito.mock(WriteCallback.class);
+ dataWriter.write("job1:1".getBytes(), mockCallback);
+ monitor.awaitExactlyNSpecs(1);
+ Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job1")));
+ Assert.assertEquals(monitor.getJobSpecs().get(new URI("job1")).getVersion(), "1");
+
+ dataWriter.write("job2:1".getBytes(), mockCallback);
+ monitor.awaitExactlyNSpecs(2);
+ Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job2")));
+ Assert.assertEquals(monitor.getJobSpecs().get(new URI("job2")).getVersion(), "1");
+
+ dataWriter.write((MockedKafkaJobMonitor.REMOVE + ":job1").getBytes(), mockCallback);
+ monitor.awaitExactlyNSpecs(1);
+ Assert.assertFalse(monitor.getJobSpecs().containsKey(new URI("job1")));
+ Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job2")));
+
+ dataWriter.write(("job2:2,job1:2").getBytes(), mockCallback);
+ monitor.awaitExactlyNSpecs(2);
+ Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job1")));
+ Assert.assertEquals(monitor.getJobSpecs().get(new URI("job1")).getVersion(), "2");
+ Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job2")));
+ Assert.assertEquals(monitor.getJobSpecs().get(new URI("job2")).getVersion(), "2");
+
+ monitor.shutDown();
+ }
+
+ @AfterSuite
+ public void afterSuite() {
+ try {
+ _closer.close();
+ } catch (Exception e) {
+ System.out.println("Failed to close data writer." + e);
+ } finally {
+ try {
+ close();
+ } catch (Exception e) {
+ System.out.println("Failed to close Kafka server."+ e);
+ }
+ }
+ }
+
+}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/SLAEventKafkaJobMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/SLAEventKafkaJobMonitorTest.java
similarity index 83%
rename from gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/SLAEventKafkaJobMonitorTest.java
rename to gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/SLAEventKafkaJobMonitorTest.java
index 5c278c9..74564b7 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/SLAEventKafkaJobMonitorTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/SLAEventKafkaJobMonitorTest.java
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-package org.apache.gobblin.runtime.job_monitor;
+package org.apache.gobblin.runtime;
+import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Map;
@@ -36,8 +37,11 @@ import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.event.sla.SlaEventKeys;
import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
import org.apache.gobblin.metrics.reporter.util.NoopSchemaVersionWriter;
+import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.kafka.HighLevelConsumerTest;
+import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor;
+import org.apache.gobblin.runtime.job_monitor.SLAEventKafkaJobMonitor;
import org.apache.gobblin.util.Either;
@@ -54,8 +58,8 @@ public class SLAEventKafkaJobMonitorTest {
@Test
public void testParseJobSpec() throws Exception {
- SLAEventKafkaJobMonitor monitor =
- new SLAEventKafkaJobMonitor("topic", null, new URI("/base/URI"),
+ MockSLAEventKafkaJobMonitor monitor =
+ new MockSLAEventKafkaJobMonitor("topic", null, new URI("/base/URI"),
HighLevelConsumerTest.getSimpleConfig(Optional.of(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX)),
new NoopSchemaVersionWriter(), Optional.<Pattern>absent(), Optional.<Pattern>absent(), this.templateURI,
ImmutableMap.of("metadataKey1", "key1"));
@@ -79,8 +83,8 @@ public class SLAEventKafkaJobMonitorTest {
@Test
public void testFilterByName() throws Exception {
- SLAEventKafkaJobMonitor monitor =
- new SLAEventKafkaJobMonitor("topic", null, new URI("/base/URI"),
+ MockSLAEventKafkaJobMonitor monitor =
+ new MockSLAEventKafkaJobMonitor("topic", null, new URI("/base/URI"),
HighLevelConsumerTest.getSimpleConfig(Optional.of(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX)),
new NoopSchemaVersionWriter(), Optional.<Pattern>absent(), Optional.of(Pattern.compile("^accept.*")),
this.templateURI, ImmutableMap.<String, String>of());
@@ -110,8 +114,8 @@ public class SLAEventKafkaJobMonitorTest {
props.put(SLAEventKafkaJobMonitor.DATASET_URN_FILTER_KEY, "^/accept.*");
Config config = ConfigFactory.parseProperties(props).withFallback(superConfig);
- SLAEventKafkaJobMonitor monitor =
- new SLAEventKafkaJobMonitor("topic", null, new URI("/base/URI"),
+ MockSLAEventKafkaJobMonitor monitor =
+ new MockSLAEventKafkaJobMonitor("topic", null, new URI("/base/URI"),
HighLevelConsumerTest.getSimpleConfig(Optional.of(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX)),
new NoopSchemaVersionWriter(), Optional.of(Pattern.compile("^/accept.*")), Optional.<Pattern>absent(),
this.templateURI, ImmutableMap.<String, String>of());
@@ -171,4 +175,25 @@ public class SLAEventKafkaJobMonitorTest {
return new GobblinTrackingEvent(0L, "namespace", name, metadata);
}
+ class MockSLAEventKafkaJobMonitor extends SLAEventKafkaJobMonitor {
+
+ protected MockSLAEventKafkaJobMonitor(String topic, MutableJobCatalog catalog, URI baseURI,
+ Config limitedScopeConfig, SchemaVersionWriter<?> versionWriter, Optional<Pattern> urnFilter,
+ Optional<Pattern> nameFilter, URI template, Map<String, String> extractKeys)
+ throws IOException {
+ super(topic, catalog, baseURI, limitedScopeConfig, versionWriter, urnFilter, nameFilter, template, extractKeys);
+ }
+
+ @Override
+ protected void buildMetricsContextAndMetrics() {
+ super.buildMetricsContextAndMetrics();
+ }
+
+ @Override
+ protected void shutdownMetrics()
+ throws IOException {
+ super.shutdownMetrics();
+ }
+ }
+
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
similarity index 93%
rename from gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
rename to gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 7aecfb1..125c2bf 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.gobblin.service.modules.core;
+package org.apache.gobblin.service;
import java.io.File;
import java.net.URI;
@@ -25,6 +25,8 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
+
+import org.apache.gobblin.restli.EmbeddedRestliServer;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.hadoop.fs.Path;
import org.eclipse.jetty.http.HttpStatus;
@@ -47,6 +49,7 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.linkedin.data.template.StringMap;
import com.linkedin.restli.client.RestLiResponseException;
+import com.typesafe.config.Config;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.KafkaTestBase;
@@ -55,11 +58,8 @@ import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.service.FlowConfig;
-import org.apache.gobblin.service.FlowConfigClient;
-import org.apache.gobblin.service.FlowId;
-import org.apache.gobblin.service.Schedule;
-import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.core.GitConfigMonitor;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
@@ -89,7 +89,7 @@ public class GobblinServiceManagerTest {
private static final String TEST_SOURCE_NAME = "testSource";
private static final String TEST_SINK_NAME = "testSink";
- private GobblinServiceManager gobblinServiceManager;
+ private MockGobblinServiceManager gobblinServiceManager;
private FlowConfigClient flowConfigClient;
private Git gitForPush;
@@ -143,12 +143,12 @@ public class GobblinServiceManagerTest {
this.gitForPush.commit().setMessage("First commit").call();
this.gitForPush.push().setRemote("origin").setRefSpecs(new RefSpec("master")).call();
- this.gobblinServiceManager = new GobblinServiceManager("CoreService", "1",
+ this.gobblinServiceManager = new MockGobblinServiceManager("CoreService", "1",
ConfigUtils.propertiesToConfig(serviceCoreProperties), Optional.of(new Path(SERVICE_WORK_DIR)));
this.gobblinServiceManager.start();
this.flowConfigClient = new FlowConfigClient(String.format("http://localhost:%s/",
- this.gobblinServiceManager.restliServer.getPort()));
+ this.gobblinServiceManager.getRestLiServer().getPort()));
}
private void cleanUpDir(String dir) throws Exception {
@@ -192,7 +192,7 @@ public class GobblinServiceManagerTest {
.setProperties(new StringMap(flowProperties));
this.flowConfigClient.createFlowConfig(flowConfig);
- Assert.assertTrue(this.gobblinServiceManager.flowCatalog.getSpecs().size() == 1, "Flow that was created is not "
+ Assert.assertTrue(this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 1, "Flow that was created is not "
+ "reflecting in FlowCatalog");
}
@@ -289,7 +289,7 @@ public class GobblinServiceManagerTest {
Files.write("flow.name=testFlow\nflow.group=testGroup\nparam1=value20\n", testFlowFile, Charsets.UTF_8);
- Collection<Spec> specs = this.gobblinServiceManager.flowCatalog.getSpecs();
+ Collection<Spec> specs = this.gobblinServiceManager.getFlowCatalog().getSpecs();
Assert.assertTrue(specs.size() == 0);
// add, commit, push
@@ -300,7 +300,7 @@ public class GobblinServiceManagerTest {
// polling is every 5 seconds, so wait twice as long and check
TimeUnit.SECONDS.sleep(10);
- specs = this.gobblinServiceManager.flowCatalog.getSpecs();
+ specs = this.gobblinServiceManager.getFlowCatalog().getSpecs();
Assert.assertTrue(specs.size() == 1);
FlowSpec spec = (FlowSpec) (specs.iterator().next());
@@ -356,4 +356,18 @@ public class GobblinServiceManagerTest {
}
cleanUpDir(FLOW_SPEC_STORE_DIR);
}
+
+ class MockGobblinServiceManager extends GobblinServiceManager {
+
+ public MockGobblinServiceManager(String serviceName, String serviceId, Config config,
+ Optional<Path> serviceWorkDirOptional)
+ throws Exception {
+ super(serviceName, serviceId, config, serviceWorkDirOptional);
+ }
+
+ protected EmbeddedRestliServer getRestLiServer() {
+ return this.restliServer;
+ }
+
+ }
}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
similarity index 86%
rename from gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
rename to gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
index e9c7ee6..26d3e3b 100644
--- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service;
import java.io.File;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
import java.net.URI;
import java.util.List;
import java.util.Map;
@@ -27,18 +28,21 @@ import java.util.Properties;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Test;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.KafkaTestBase;
+import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys;
-import org.apache.gobblin.metrics.reporter.KafkaTestBase;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
+import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.WriteResponse;
import org.apache.gobblin.runtime.api.SpecExecutor;
@@ -60,10 +64,16 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
private static final String _TEST_DIR_PATH = "/tmp/StreamingKafkaSpecExecutorTest";
private static final String _JOBS_DIR_PATH = _TEST_DIR_PATH + "/jobs";
+ @BeforeSuite
+ public void beforeSuite() {
+ log.info("Process id = " + ManagementFactory.getRuntimeMXBean().getName());
+ startServers();
+ }
+
public StreamingKafkaSpecExecutorTest()
throws InterruptedException, RuntimeException {
- super(TOPIC);
- _kafkaBrokers = "localhost:" + kafkaPort;
+ super();
+ _kafkaBrokers = "localhost:" + this.getKafkaServerPort();
log.info("Going to use Kakfa broker: " + _kafkaBrokers);
cleanupTestDir();
@@ -88,13 +98,16 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
// Properties for Producer
_properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, TOPIC);
+ _properties.setProperty("spec.kafka.dataWriterClass", "org.apache.gobblin.kafka.writer.Kafka09DataWriter");
_properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers", _kafkaBrokers);
_properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
// Properties for Consumer
- _properties.setProperty("jobSpecMonitor.kafka.zookeeper.connect", zkConnect);
+ _properties.setProperty(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX + "." + ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers);
+ _properties.setProperty(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX + "." + Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
_properties.setProperty(SimpleKafkaSpecExecutor.SPEC_KAFKA_TOPICS_KEY, TOPIC);
_properties.setProperty("gobblin.cluster.jobconf.fullyQualifiedPath", _JOBS_DIR_PATH);
+ _properties.setProperty(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX + "." + Kafka09ConsumerClient.CONFIG_PREFIX + Kafka09ConsumerClient.CONSUMER_CONFIG + ".auto.offset.reset", "earliest");
Config config = ConfigUtils.propertiesToConfig(_properties);
@@ -164,7 +177,7 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
.build();
}
- @AfterClass
+ @AfterSuite
public void after() {
try {
_closer.close();
@@ -183,9 +196,4 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
cleanupTestDir();
}
-
- @AfterSuite
- public void afterSuite() {
- closeServer();
- }
}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/BaseKafkaConsumerRecord.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/BaseKafkaConsumerRecord.java
index 017a166..65f84be 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/BaseKafkaConsumerRecord.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/BaseKafkaConsumerRecord.java
@@ -30,6 +30,8 @@ public abstract class BaseKafkaConsumerRecord implements KafkaConsumerRecord {
private final long offset;
private final long valueSizeInBytes;
+ private final String topic;
+ private final int partitionId;
public static final long VALUE_SIZE_UNAVAILABLE = -1l;
@Override
@@ -46,4 +48,14 @@ public abstract class BaseKafkaConsumerRecord implements KafkaConsumerRecord {
public long getValueSizeInBytes() {
return this.valueSizeInBytes;
}
+
+ @Override
+ public int getPartition() {
+ return this.partitionId;
+ }
+
+ @Override
+ public String getTopic() {
+ return this.topic;
+ }
}
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/BaseKafkaConsumerRecord.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinConsumerRebalanceListener.java
similarity index 56%
copy from gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/BaseKafkaConsumerRecord.java
copy to gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinConsumerRebalanceListener.java
index 017a166..334dcd4 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/BaseKafkaConsumerRecord.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinConsumerRebalanceListener.java
@@ -14,36 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.gobblin.kafka.client;
-import lombok.AllArgsConstructor;
-import lombok.EqualsAndHashCode;
-import lombok.ToString;
+import java.util.Collection;
+
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+
/**
- * A base {@link KafkaConsumerRecord} that with offset and valueSizeInBytes
+ * A listener that is called when kafka partitions are re-assigned when a consumer leaves/joins a group
+ *
+ * For more details, See https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html
*/
-@AllArgsConstructor
-@EqualsAndHashCode
-@ToString
-public abstract class BaseKafkaConsumerRecord implements KafkaConsumerRecord {
-
- private final long offset;
- private final long valueSizeInBytes;
- public static final long VALUE_SIZE_UNAVAILABLE = -1l;
-
- @Override
- public long getOffset() {
- return this.offset;
- }
-
- @Override
- public long getNextOffset() {
- return this.offset + 1l;
- }
-
- @Override
- public long getValueSizeInBytes() {
- return this.valueSizeInBytes;
- }
+
+public interface GobblinConsumerRebalanceListener {
+
+ void onPartitionsRevoked(Collection<KafkaPartition> partitions);
+
+ void onPartitionsAssigned(Collection<KafkaPartition> partitions);
+
}
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
index 4026f33..684a48c 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
@@ -18,6 +18,7 @@ package org.apache.gobblin.kafka.client;
import java.io.Closeable;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -94,7 +95,7 @@ public interface GobblinKafkaConsumerClient extends Closeable {
/**
* API to consume records from kakfa starting from <code>nextOffset</code> till <code>maxOffset</code>.
- * If <code>maxOffset</code> is greater than <code>nextOffset</code>, returns a null.
+ * If <code>nextOffset</code> is greater than <code>maxOffset</code>, returns a null.
* <code>nextOffset</code>
* <p>
* <b>NOTE:</b> If the underlying kafka-client version does not support
@@ -110,6 +111,29 @@ public interface GobblinKafkaConsumerClient extends Closeable {
public Iterator<KafkaConsumerRecord> consume(KafkaPartition partition, long nextOffset, long maxOffset);
/**
+ * API to consume records from kakfa
+ * @return
+ */
+ default Iterator<KafkaConsumerRecord> consume() {
+ return Collections.emptyIterator();
+ }
+
+ /**
+ * Subscribe to a topic
+ * @param topic
+ */
+ default void subscribe(String topic) {
+ return;
+ }
+
+ /**
+ * Subscribe to a topic along with a GobblinKafkaRebalanceListener
+ * @param topic
+ */
+ default void subscribe(String topic, GobblinConsumerRebalanceListener listener) {
+ return;
+ }
+ /**
* API to return underlying Kafka consumer metrics. The individual implementations must translate
* org.apache.kafka.common.Metric to Coda Hale Metrics. A typical use case for reporting the consumer metrics
* will call this method inside a scheduled thread.
@@ -120,6 +144,29 @@ public interface GobblinKafkaConsumerClient extends Closeable {
}
/**
+ * Commit offsets manually to Kafka asynchronously
+ */
+ default void commitOffsetsAsync(Map<KafkaPartition, Long> partitionOffsets) {
+ return;
+ }
+
+ /**
+ * Commit offsets manually to Kafka synchronously
+ */
+ default void commitOffsetsSync(Map<KafkaPartition, Long> partitionOffsets) {
+ return;
+ }
+
+ /**
+ * returns the last committed offset for a KafkaPartition
+ * @param partition
+ * @return last committed offset or -1 for invalid KafkaPartition
+ */
+ default long committed(KafkaPartition partition) {
+ return -1L;
+ }
+
+ /**
* A factory to create {@link GobblinKafkaConsumerClient}s
*/
public interface GobblinKafkaConsumerClientFactory {
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java
index 3279508..cba9fa2 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java
@@ -50,4 +50,23 @@ public interface KafkaConsumerRecord {
public default boolean isTimestampLogAppend() {
return false;
}
+
+ default boolean isTimestampCreateTime() {
+ return false;
+ }
+
+ default boolean isTimestampNone() {
+ return false;
+ }
+
+ /**
+ * @return Partition id for this record
+ */
+ int getPartition();
+
+ /**
+ * @return topic for this record
+ */
+ String getTopic();
+
}
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
index 1255e52..bb8681e 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
@@ -56,7 +56,7 @@ public class KafkaWriterConfigurationKeys {
/** Kafka producer scoped configuration keys go here **/
static final String KEY_SERIALIZER_CONFIG = "key.serializer";
static final String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
- static final String VALUE_SERIALIZER_CONFIG = "value.serializer";
+ public static final String VALUE_SERIALIZER_CONFIG = "value.serializer";
static final String DEFAULT_VALUE_SERIALIZER = "org.apache.kafka.common.serialization.ByteArraySerializer";
static final String CLIENT_ID_CONFIG = "client.id";
static final String CLIENT_ID_DEFAULT = "gobblin";
@@ -68,7 +68,7 @@ public class KafkaWriterConfigurationKeys {
public static final String CLUSTER_ZOOKEEPER = KAFKA_TOPIC_CONFIG + "zookeeper";
static final String REPLICATION_COUNT = KAFKA_TOPIC_CONFIG + "replicationCount";
static final int REPLICATION_COUNT_DEFAULT = 1;
- static final String PARTITION_COUNT = KAFKA_TOPIC_CONFIG + "partitionCount";
+ public static final String PARTITION_COUNT = KAFKA_TOPIC_CONFIG + "partitionCount";
static final int PARTITION_COUNT_DEFAULT = 1;
public static final String ZOOKEEPER_SESSION_TIMEOUT = CLUSTER_ZOOKEEPER + ".sto";
static final int ZOOKEEPER_SESSION_TIMEOUT_DEFAULT = 10000; // 10 seconds
diff --git a/gobblin-runtime/build.gradle b/gobblin-runtime/build.gradle
index b28db6b..0ff442a 100644
--- a/gobblin-runtime/build.gradle
+++ b/gobblin-runtime/build.gradle
@@ -79,7 +79,6 @@ dependencies {
compile externalDependency.quartz
compile externalDependency.slf4j
compile externalDependency.typesafeConfig
- compile externalDependency.kafka08
compile externalDependency.guavaretrying
compile externalDependency.hiveExec
compile externalDependency.parquet
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
index 40e4e48..d9bc1aa 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
@@ -23,12 +23,14 @@ import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Properties;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
import org.apache.gobblin.runtime.api.GobblinInstanceDriver;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaAvroJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaAvroJobMonitor.java
index 527109d..14ec6cc 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaAvroJobMonitor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaAvroJobMonitor.java
@@ -32,11 +32,12 @@ import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
-import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
+import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
import org.apache.gobblin.runtime.api.JobSpec;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
index 6ac9a68..0f47625 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
@@ -24,8 +24,11 @@ import java.util.Collection;
import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
import com.typesafe.config.Config;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobSpecMonitor;
@@ -35,7 +38,6 @@ import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.Either;
-import kafka.message.MessageAndMetadata;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -97,9 +99,9 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]>
}
@Override
- protected void processMessage(MessageAndMetadata<byte[], byte[]> message) {
+ protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
try {
- Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.message());
+ Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.getValue());
for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
if (parsedMessage instanceof Either.Left) {
this.newSpecs.inc();
@@ -113,8 +115,8 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]>
}
}
} catch (IOException ioe) {
- String messageStr = new String(message.message(), Charsets.UTF_8);
- log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe);
+ String messageStr = new String(message.getValue(), Charsets.UTF_8);
+ log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.getOffset(), messageStr), ioe);
}
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index ae21921..4eb146b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -18,63 +18,76 @@
package org.apache.gobblin.runtime.kafka;
import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang3.reflect.ConstructorUtils;
import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.kafka.client.GobblinConsumerRebalanceListener;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
+import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
/**
- * A high level consumer for Kafka topics. Subclasses should implement {@link HighLevelConsumer#processMessage(MessageAndMetadata)}
+ * A high level consumer for Kafka topics. Subclasses should implement {@link HighLevelConsumer#processMessage(DecodeableKafkaRecord)}
*
- * Note each thread will block for each message until {@link #processMessage(MessageAndMetadata)} returns, so for high
- * volume topics with few partitions {@link #processMessage(MessageAndMetadata)} must be fast or itself spawn more
- * threads.
+ * Note: each thread (queue) will block for each message until {@link #processMessage(DecodeableKafkaRecord)} returns
*
- * If threads > partitions in topic, extra threads will be idle.
+ * If threads(queues) > partitions in topic, extra threads(queues) will be idle.
*
- * @param <K> type of the key.
- * @param <V> type of the value.
*/
@Slf4j
-public abstract class HighLevelConsumer<K, V> extends AbstractIdleService {
+public abstract class HighLevelConsumer<K,V> extends AbstractIdleService {
+
+ public static final String CONSUMER_CLIENT_FACTORY_CLASS_KEY = "kafka.consumerClientClassFactory";
+ private static final String DEFAULT_CONSUMER_CLIENT_FACTORY_CLASS =
+ "org.apache.gobblin.kafka.client.Kafka09ConsumerClient$Factory";
+ public static final String ENABLE_AUTO_COMMIT_KEY = "enable.auto.commit";
+ public static final boolean DEFAULT_AUTO_COMMIT_VALUE = false;
public static final String GROUP_ID_KEY = "group.id";
// NOTE: changing this will break stored offsets
private static final String DEFAULT_GROUP_ID = "KafkaJobSpecMonitor";
+ public static final String OFFSET_COMMIT_NUM_RECORDS_THRESHOLD_KEY = "offsets.commit.num.records.threshold";
+ public static final int DEFAULT_OFFSET_COMMIT_NUM_RECORDS_THRESHOLD = 100;
+ public static final String OFFSET_COMMIT_TIME_THRESHOLD_SECS_KEY = "offsets.commit.time.threshold.secs";
+ public static final int DEFAULT_OFFSET_COMMIT_TIME_THRESHOLD_SECS = 10;
@Getter
protected final String topic;
protected final Config config;
private final int numThreads;
- private final ConsumerConfig consumerConfig;
/**
* {@link MetricContext} for the consumer. Note this is instantiated when {@link #startUp()} is called, so
@@ -83,14 +96,71 @@ public abstract class HighLevelConsumer<K, V> extends AbstractIdleService {
@Getter
private MetricContext metricContext;
private Counter messagesRead;
- private ConsumerConnector consumer;
- private ExecutorService executor;
+ @Getter
+ private final GobblinKafkaConsumerClient gobblinKafkaConsumerClient;
+ private final ScheduledExecutorService consumerExecutor;
+ private final ExecutorService queueExecutor;
+ private final BlockingQueue[] queues;
+ private final AtomicInteger recordsProcessed;
+ private final Map<KafkaPartition, Long> partitionOffsetsToCommit;
+ private final boolean enableAutoCommit;
+ private final int offsetsCommitNumRecordsThreshold;
+ private final int offsetsCommitTimeThresholdSecs;
+ private long lastCommitTime = System.currentTimeMillis();
+
+ private static final Config FALLBACK =
+ ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+ .put(GROUP_ID_KEY, DEFAULT_GROUP_ID)
+ .put(ENABLE_AUTO_COMMIT_KEY, DEFAULT_AUTO_COMMIT_VALUE)
+ .put(CONSUMER_CLIENT_FACTORY_CLASS_KEY, DEFAULT_CONSUMER_CLIENT_FACTORY_CLASS)
+ .put(OFFSET_COMMIT_NUM_RECORDS_THRESHOLD_KEY, DEFAULT_OFFSET_COMMIT_NUM_RECORDS_THRESHOLD)
+ .put(OFFSET_COMMIT_TIME_THRESHOLD_SECS_KEY, DEFAULT_OFFSET_COMMIT_TIME_THRESHOLD_SECS)
+ .build());
public HighLevelConsumer(String topic, Config config, int numThreads) {
this.topic = topic;
this.numThreads = numThreads;
- this.config = config;
- this.consumerConfig = createConsumerConfig(config);
+ this.config = config.withFallback(FALLBACK);
+ this.gobblinKafkaConsumerClient = createConsumerClient(this.config);
+ // On Partition rebalance, commit exisiting offsets and reset.
+ this.gobblinKafkaConsumerClient.subscribe(this.topic, new GobblinConsumerRebalanceListener() {
+ @Override
+ public void onPartitionsRevoked(Collection<KafkaPartition> partitions) {
+ copyAndCommit();
+ partitionOffsetsToCommit.clear();
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<KafkaPartition> partitions) {
+ // No op
+ }
+ });
+ this.consumerExecutor = Executors.newSingleThreadScheduledExecutor(ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("HighLevelConsumerThread")));
+ this.queueExecutor = Executors.newFixedThreadPool(this.numThreads, ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("QueueProcessor-%d")));
+ this.queues = new LinkedBlockingQueue[numThreads];
+ for(int i=0;i<queues.length;i++) {
+ this.queues[i] = new LinkedBlockingQueue();
+ }
+ this.recordsProcessed = new AtomicInteger(0);
+ this.partitionOffsetsToCommit = new ConcurrentHashMap<>();
+ this.enableAutoCommit = ConfigUtils.getBoolean(config, ENABLE_AUTO_COMMIT_KEY, DEFAULT_AUTO_COMMIT_VALUE);
+ this.offsetsCommitNumRecordsThreshold = ConfigUtils.getInt(config, OFFSET_COMMIT_NUM_RECORDS_THRESHOLD_KEY, DEFAULT_OFFSET_COMMIT_NUM_RECORDS_THRESHOLD);
+ this.offsetsCommitTimeThresholdSecs = ConfigUtils.getInt(config, OFFSET_COMMIT_TIME_THRESHOLD_SECS_KEY, DEFAULT_OFFSET_COMMIT_TIME_THRESHOLD_SECS);
+ }
+
+ protected GobblinKafkaConsumerClient createConsumerClient(Config config) {
+ String kafkaConsumerClientClass = config.getString(CONSUMER_CLIENT_FACTORY_CLASS_KEY);
+
+ try {
+ Class clientFactoryClass = Class.forName(kafkaConsumerClientClass);
+ final GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory factory =
+ (GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory)
+ ConstructorUtils.invokeConstructor(clientFactoryClass);
+
+ return factory.create(config);
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException("Failed to instantiate Kafka consumer client " + kafkaConsumerClientClass, e);
+ }
}
/**
@@ -125,81 +195,129 @@ public abstract class HighLevelConsumer<K, V> extends AbstractIdleService {
protected List<Tag<?>> getTagsForMetrics() {
List<Tag<?>> tags = Lists.newArrayList();
tags.add(new Tag<>(RuntimeMetrics.TOPIC, this.topic));
- tags.add(new Tag<>(RuntimeMetrics.GROUP_ID, this.consumerConfig.groupId()));
+ tags.add(new Tag<>(RuntimeMetrics.GROUP_ID, this.config.getString(GROUP_ID_KEY)));
return tags;
}
/**
- * Called every time a message is read from the stream. Implementation must be thread-safe if {@link #numThreads} is
+ * Called every time a message is read from the queue. Implementation must be thread-safe if {@link #numThreads} is
* set larger than 1.
*/
- protected abstract void processMessage(MessageAndMetadata<K, V> message);
+ protected abstract void processMessage(DecodeableKafkaRecord<K,V> message);
@Override
protected void startUp() {
buildMetricsContextAndMetrics();
- this.consumer = createConsumerConnector();
-
- List<KafkaStream<byte[], byte[]>> streams = createStreams();
- this.executor = Executors.newFixedThreadPool(this.numThreads);
+ // Method that starts threads that processes queues
+ processQueues();
+ // Main thread that constantly polls messages from kafka
+ consumerExecutor.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ consume();
+ }
+ }, 0, 50, TimeUnit.MILLISECONDS);
+ }
- // now create an object to consume the messages
- //
- int threadNumber = 0;
- for (final KafkaStream stream : streams) {
- this.executor.execute(new MonitorConsumer(stream));
- threadNumber++;
+ /**
+ * Consumes {@link KafkaConsumerRecord}s and adds to a queue
+ * Note: All records from a KafkaPartition are added to the same queue.
+ * A queue can contain records from multiple partitions if partitions > numThreads(queues)
+ */
+ private void consume() {
+ try {
+ Iterator<KafkaConsumerRecord> itr = gobblinKafkaConsumerClient.consume();
+ if(!enableAutoCommit) {
+ commitOffsets();
+ }
+ while (itr.hasNext()) {
+ KafkaConsumerRecord record = itr.next();
+ int idx = record.getPartition() % numThreads;
+ queues[idx].put(record);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
}
- protected ConsumerConfig createConsumerConfig(Config config) {
- Properties props = ConfigUtils.configToProperties(config);
+ /**
+ * Assigns a queue to each thread of the {@link #queueExecutor}
+ * Note: Assumption here is that {@link #numThreads} is same a number of queues
+ */
+ private void processQueues() {
+ for(BlockingQueue queue : queues) {
+ queueExecutor.execute(new QueueProcessor(queue));
+ }
+ }
- if (!props.containsKey(GROUP_ID_KEY)) {
- props.setProperty(GROUP_ID_KEY, DEFAULT_GROUP_ID);
+ /**
+ * Commits offsets to kafka
+ */
+ private void commitOffsets() {
+ if(shouldCommitOffsets()) {
+ copyAndCommit();
}
- return new ConsumerConfig(props);
}
- protected ConsumerConnector createConsumerConnector() {
- return Consumer.createJavaConsumerConnector(this.consumerConfig);
+ @VisibleForTesting
+ protected void commitOffsets(Map<KafkaPartition, Long> partitionOffsets) {
+ this.gobblinKafkaConsumerClient.commitOffsetsAsync(partitionOffsets);
}
- protected List<KafkaStream<byte[], byte[]>> createStreams() {
- Map<String, Integer> topicCountMap = Maps.newHashMap();
- topicCountMap.put(this.topic, this.numThreads);
- Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = this.consumer.createMessageStreams(topicCountMap);
- return consumerMap.get(this.topic);
+ private void copyAndCommit() {
+ Map<KafkaPartition, Long> copy = new HashMap<>(partitionOffsetsToCommit);
+ recordsProcessed.set(0);
+ lastCommitTime = System.currentTimeMillis();
+ commitOffsets(copy);
+ }
+
+ private boolean shouldCommitOffsets() {
+ return recordsProcessed.intValue() >= offsetsCommitNumRecordsThreshold || ((System.currentTimeMillis() - lastCommitTime) / 1000 >= offsetsCommitTimeThresholdSecs);
}
@Override
public void shutDown() {
- if (this.consumer != null) {
- this.consumer.shutdown();
- }
- if (this.executor != null) {
- ExecutorsUtils.shutdownExecutorService(this.executor, Optional.of(log), 5000, TimeUnit.MILLISECONDS);
- }
+ ExecutorsUtils.shutdownExecutorService(this.consumerExecutor, Optional.of(log), 5000, TimeUnit.MILLISECONDS);
+ ExecutorsUtils.shutdownExecutorService(this.queueExecutor, Optional.of(log), 5000, TimeUnit.MILLISECONDS);
try {
+ this.gobblinKafkaConsumerClient.close();
this.shutdownMetrics();
- } catch (IOException ioe) {
- log.warn("Failed to shutdown metrics for " + this.getClass().getSimpleName());
+ } catch (IOException e) {
+ log.warn("Failed to shut down consumer client or metrics ", e);
}
}
/**
- * A monitor for a Kafka stream.
+ * Polls a {@link BlockingQueue} indefinitely for {@link KafkaConsumerRecord}
+ * Processes each record and maintains a count for #recordsProcessed
+ * Also records the latest offset for every {@link KafkaPartition} if auto commit is disabled
+ * Note: Messages in this queue will always be in order for every {@link KafkaPartition}
*/
- @AllArgsConstructor
- public class MonitorConsumer implements Runnable {
- private final KafkaStream stream;
+ class QueueProcessor implements Runnable {
+ private final BlockingQueue<KafkaConsumerRecord> queue;
+
+ public QueueProcessor(BlockingQueue queue) {
+ this.queue = queue;
+ }
+ @Override
public void run() {
- ConsumerIterator<K, V> it = this.stream.iterator();
- while (it.hasNext()) {
- MessageAndMetadata<K, V> message = it.next();
- HighLevelConsumer.this.messagesRead.inc();
- HighLevelConsumer.this.processMessage(message);
+ log.info("Starting queue processing.. " + Thread.currentThread().getName());
+ try {
+ while(true) {
+ KafkaConsumerRecord record = queue.take();
+ messagesRead.inc();
+ HighLevelConsumer.this.processMessage((DecodeableKafkaRecord)record);
+ recordsProcessed.incrementAndGet();
+
+ if(!HighLevelConsumer.this.enableAutoCommit) {
+ KafkaPartition partition = new KafkaPartition.Builder().withId(record.getPartition()).withTopicName(HighLevelConsumer.this.topic).build();
+ // Committed offset should always be the offset of the next record to be read (hence +1)
+ partitionOffsetsToCommit.put(partition, record.getOffset() + 1);
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
}
}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java
deleted file mode 100644
index c825a12..0000000
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.runtime.job_monitor;
-
-import java.net.URI;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Optional;
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.runtime.kafka.HighLevelConsumerTest;
-
-
-public class KafkaJobMonitorTest {
-
- @Test
- public void test() throws Exception {
-
- Config config = HighLevelConsumerTest.getSimpleConfig(Optional.of(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX));
-
- MockedKafkaJobMonitor monitor = MockedKafkaJobMonitor.create(config);
- monitor.startAsync();
-
- monitor.getMockKafkaStream().pushToStream("job1:1");
- monitor.awaitExactlyNSpecs(1);
- Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job1")));
- Assert.assertEquals(monitor.getJobSpecs().get(new URI("job1")).getVersion(), "1");
-
- monitor.getMockKafkaStream().pushToStream("job2:1");
- monitor.awaitExactlyNSpecs(2);
- Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job2")));
- Assert.assertEquals(monitor.getJobSpecs().get(new URI("job2")).getVersion(), "1");
-
- monitor.getMockKafkaStream().pushToStream(MockedKafkaJobMonitor.REMOVE + ":job1");
- monitor.awaitExactlyNSpecs(1);
- Assert.assertFalse(monitor.getJobSpecs().containsKey(new URI("job1")));
- Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job2")));
-
- monitor.getMockKafkaStream().pushToStream("job2:2,job1:2");
- monitor.awaitExactlyNSpecs(2);
- Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job1")));
- Assert.assertEquals(monitor.getJobSpecs().get(new URI("job1")).getVersion(), "2");
- Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job2")));
- Assert.assertEquals(monitor.getJobSpecs().get(new URI("job2")).getVersion(), "2");
-
- monitor.shutDown();
- }
-}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockKafkaStream.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockKafkaStream.java
deleted file mode 100644
index 7787519..0000000
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockKafkaStream.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.runtime.job_monitor;
-
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.mockito.Mockito;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
-
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.FetchedDataChunk;
-import kafka.consumer.KafkaStream;
-import kafka.consumer.PartitionTopicInfo;
-import kafka.consumer.ZookeeperConsumerConnector;
-import kafka.message.ByteBufferMessageSet;
-import kafka.message.Message;
-import kafka.message.NoCompressionCodec$;
-import kafka.serializer.DefaultDecoder;
-import kafka.utils.VerifiableProperties;
-import lombok.Getter;
-import scala.collection.JavaConversions;
-
-
-public class MockKafkaStream {
-
- private final List<BlockingQueue<FetchedDataChunk>> queues;
- @Getter
- private final List<KafkaStream<byte[], byte[]>> mockStreams;
- private final List<AtomicLong> offsets;
- private final AtomicLong nextStream;
-
- public MockKafkaStream(int numStreams) {
-
- this.queues = Lists.newArrayList();
- this.mockStreams = Lists.newArrayList();
- this.offsets = Lists.newArrayList();
-
- for (int i = 0; i < numStreams; i++) {
- BlockingQueue<FetchedDataChunk> queue = Queues.newLinkedBlockingQueue();
- this.queues.add(queue);
- this.mockStreams.add(createMockStream(queue));
- this.offsets.add(new AtomicLong(0));
- }
-
- this.nextStream = new AtomicLong(-1);
- }
-
- @SuppressWarnings("unchecked")
- private static KafkaStream<byte[], byte[]> createMockStream(BlockingQueue<FetchedDataChunk> queue) {
-
- KafkaStream<byte[], byte[]> stream = (KafkaStream<byte[], byte[]>) Mockito.mock(KafkaStream.class);
- ConsumerIterator<byte[], byte[]> it =
- new ConsumerIterator<>(queue, -1, new DefaultDecoder(new VerifiableProperties()), new DefaultDecoder(new VerifiableProperties()), "clientId");
- Mockito.when(stream.iterator()).thenReturn(it);
-
- return stream;
- }
-
- public void pushToStream(String message) {
-
- int streamNo = (int) this.nextStream.incrementAndGet() % this.queues.size();
-
- AtomicLong offset = this.offsets.get(streamNo);
- BlockingQueue<FetchedDataChunk> queue = this.queues.get(streamNo);
-
- AtomicLong thisOffset = new AtomicLong(offset.incrementAndGet());
-
- List<Message> seq = Lists.newArrayList();
- seq.add(new Message(message.getBytes(Charsets.UTF_8)));
- ByteBufferMessageSet messageSet = new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, offset, JavaConversions.asScalaBuffer(seq));
-
- FetchedDataChunk chunk = new FetchedDataChunk(messageSet,
- new PartitionTopicInfo("topic", streamNo, queue, thisOffset, thisOffset, new AtomicInteger(1), "clientId"),
- thisOffset.get());
-
- queue.add(chunk);
- }
-
- public void shutdown() {
- for (BlockingQueue<FetchedDataChunk> queue : this.queues) {
- queue.add(ZookeeperConsumerConnector.shutdownCommand());
- }
- }
-
-}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
index 9e55236..0562534 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
@@ -31,6 +31,7 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
@@ -38,19 +39,18 @@ import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.testing.AssertWithBackoff;
import org.apache.gobblin.util.Either;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
-class MockedKafkaJobMonitor extends KafkaJobMonitor {
+public class MockedKafkaJobMonitor extends KafkaJobMonitor {
private static final Splitter SPLITTER_COMMA = Splitter.on(",");
private static final Splitter SPLITTER_COLON = Splitter.on(":");
@@ -58,18 +58,15 @@ class MockedKafkaJobMonitor extends KafkaJobMonitor {
@Getter
private final Map<URI, JobSpec> jobSpecs;
- @Getter
- private final MockKafkaStream mockKafkaStream;
- public static MockedKafkaJobMonitor create(Config config) {
- return new MockedKafkaJobMonitor(config, Maps.<URI, JobSpec>newConcurrentMap());
+ public static MockedKafkaJobMonitor create(String topic, Config config) {
+ return new MockedKafkaJobMonitor(topic, config, Maps.<URI, JobSpec>newConcurrentMap());
}
- private MockedKafkaJobMonitor(Config config, Map<URI, JobSpec> jobSpecs) {
- super("topic", createMockCatalog(jobSpecs), config);
+ private MockedKafkaJobMonitor(String topic, Config config, Map<URI, JobSpec> jobSpecs) {
+ super(topic, createMockCatalog(jobSpecs), config);
this.jobSpecs = jobSpecs;
- this.mockKafkaStream = new MockKafkaStream(1);
}
private static MutableJobCatalog createMockCatalog(final Map<URI, JobSpec> jobSpecs) {
@@ -127,18 +124,7 @@ class MockedKafkaJobMonitor extends KafkaJobMonitor {
}
@Override
- protected List<KafkaStream<byte[], byte[]>> createStreams() {
- return this.mockKafkaStream.getMockStreams();
- }
-
- @Override
- protected ConsumerConnector createConsumerConnector() {
- return Mockito.mock(ConsumerConnector.class);
- }
-
- @Override
public void shutDown() {
- this.mockKafkaStream.shutdown();
super.shutDown();
}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/HighLevelConsumerTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/HighLevelConsumerTest.java
deleted file mode 100644
index e8d4e6c..0000000
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/HighLevelConsumerTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.runtime.kafka;
-
-import java.io.File;
-import java.util.Properties;
-import java.util.concurrent.TimeoutException;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Optional;
-import com.google.common.io.Files;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.job_monitor.MockKafkaStream;
-
-
-public class HighLevelConsumerTest {
-
- public static Config getSimpleConfig(Optional<String> prefix) {
- Properties properties = new Properties();
- properties.put(getConfigKey(prefix, "zookeeper.connect"), "zookeeper");
- properties.put(ConfigurationKeys.STATE_STORE_ENABLED, "true");
- File tmpDir = Files.createTempDir();
- tmpDir.deleteOnExit();
- properties.put(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, tmpDir.toString());
-
- return ConfigFactory.parseProperties(properties);
- }
-
- private static String getConfigKey(Optional<String> prefix, String key) {
- return prefix.isPresent() ? prefix.get() + "." + key : key;
- }
-
- @Test
- public void test() throws Exception {
-
- MockKafkaStream mockKafkaStream = new MockKafkaStream(5);
- MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(getSimpleConfig(Optional.<String>absent()), 5, mockKafkaStream);
-
- consumer.startAsync();
- consumer.awaitRunning();
-
- Assert.assertTrue(consumer.getMessages().isEmpty());
-
- mockKafkaStream.pushToStream("message");
-
- consumer.awaitAtLeastNMessages(1);
- Assert.assertEquals(consumer.getMessages().get(0), "message");
-
- mockKafkaStream.pushToStream("message2");
- consumer.awaitAtLeastNMessages(2);
- Assert.assertEquals(consumer.getMessages().get(1), "message2");
-
- consumer.shutDown();
- mockKafkaStream.pushToStream("message3");
- try {
- consumer.awaitAtLeastNMessages(3);
- Assert.fail();
- } catch (TimeoutException ie) {
- // should throw this
- }
- }
-}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/MockedHighLevelConsumer.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/MockedHighLevelConsumer.java
index 5035afa..520b354 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/MockedHighLevelConsumer.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/MockedHighLevelConsumer.java
@@ -18,66 +18,59 @@
package org.apache.gobblin.runtime.kafka;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
-import org.mockito.Mockito;
-
-import com.google.common.base.Charsets;
import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
-import org.apache.gobblin.runtime.job_monitor.MockKafkaStream;
-import org.apache.gobblin.testing.AssertWithBackoff;
-
import javax.annotation.Nullable;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.testing.AssertWithBackoff;
@Slf4j
-class MockedHighLevelConsumer extends HighLevelConsumer<byte[], byte[]> {
- private final MockKafkaStream mockKafkaStream;
- @Getter
- private final List<String> messages;
+public class MockedHighLevelConsumer extends HighLevelConsumer<byte[], byte[]> {
- public MockedHighLevelConsumer(Config config, int numThreads, MockKafkaStream stream) {
- super("topic", config, numThreads);
+ @Getter
+ private final List<byte[]> messages;
+ @Getter
+ private final Map<KafkaPartition, Long> committedOffsets;
- this.mockKafkaStream = stream;
+ public MockedHighLevelConsumer(String topic, Config config, int numThreads) {
+ super(topic, config, numThreads);
this.messages = Lists.newArrayList();
+ this.committedOffsets = new ConcurrentHashMap<>();
}
- public void awaitAtLeastNMessages(final int n) throws Exception {
+ public void awaitExactlyNMessages(final int n, int timeoutMillis) throws Exception {
AssertWithBackoff.assertTrue(new Predicate<Void>() {
@Override
public boolean apply(@Nullable Void input) {
- return MockedHighLevelConsumer.this.messages.size() >= n;
+ return MockedHighLevelConsumer.this.messages.size() == n;
}
- }, 1000, n + " messages", log, 2, 1000);
- }
-
- @Override
- protected void processMessage(MessageAndMetadata<byte[], byte[]> message) {
- this.messages.add(new String(message.message(), Charsets.UTF_8));
+ }, timeoutMillis, n + " messages", log, 2, 1000);
}
@Override
- protected List<KafkaStream<byte[], byte[]>> createStreams() {
- return this.mockKafkaStream.getMockStreams();
+ protected void processMessage(DecodeableKafkaRecord<byte[], byte[]> message) {
+ this.messages.add(message.getValue());
}
@Override
- protected ConsumerConnector createConsumerConnector() {
- return Mockito.mock(ConsumerConnector.class);
+ protected void commitOffsets(Map<KafkaPartition, Long> partitionOffsets) {
+ super.commitOffsets(partitionOffsets);
+ committedOffsets.putAll(partitionOffsets.entrySet().stream().collect(Collectors
+ .toMap(e -> e.getKey(), e -> e.getValue())));
}
@Override
public void shutDown() {
- this.mockKafkaStream.shutdown();
super.shutDown();
}
}
diff --git a/gobblin-service/build.gradle b/gobblin-service/build.gradle
index bafa6d1..dee31a3 100644
--- a/gobblin-service/build.gradle
+++ b/gobblin-service/build.gradle
@@ -60,7 +60,6 @@ dependencies {
compile externalDependency.jgit
compile externalDependency.jodaTime
compile externalDependency.jgrapht
- compile externalDependency.kafka08
compile externalDependency.log4j
compile externalDependency.lombok
compile externalDependency.metricsCore
@@ -70,12 +69,12 @@ dependencies {
compile externalDependency.slf4j
compile externalDependency.typesafeConfig
compile externalDependency.zkClient
+ compile externalDependency.joptSimple
testCompile project(":gobblin-example")
// Required for adding Test class into classpath
testCompile project(":gobblin-runtime").sourceSets.test.output
- testCompile project(path: ":gobblin-modules:gobblin-kafka-08:", configuration: "tests")
testCompile project(path: ":gobblin-metastore", configuration: "testFixtures")
testCompile project(":gobblin-test-utils")
testCompile externalDependency.byteman
@@ -88,7 +87,6 @@ dependencies {
testCompile externalDependency.hamcrest
testCompile externalDependency.jhyde
testCompile externalDependency.mockito
- testCompile externalDependency.kafka08Test
}
// Begin HACK to get around POM being depenendent on the (empty) gobblin-rest-api instead of gobblin-rest-api-rest-client
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index be0947d..279c769 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -33,11 +33,11 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import avro.shaded.com.google.common.annotations.VisibleForTesting;
-import kafka.message.MessageAndMetadata;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
import org.apache.gobblin.metastore.StateStore;
@@ -114,15 +114,15 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
}
@Override
- protected void processMessage(MessageAndMetadata<byte[],byte[]> message) {
+ protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
try {
- org.apache.gobblin.configuration.State jobStatus = parseJobStatus(message.message());
+ org.apache.gobblin.configuration.State jobStatus = parseJobStatus(message.getValue());
if (jobStatus != null) {
addJobStatusToStateStore(jobStatus, this.stateStore);
}
} catch (IOException ioe) {
- String messageStr = new String(message.message(), Charsets.UTF_8);
- log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe);
+ String messageStr = new String(message.getValue(), Charsets.UTF_8);
+ log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.getOffset(), messageStr), ioe);
}
}