You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/10/20 00:07:37 UTC
[1/2] incubator-gobblin git commit: [GOBBLIN-292] Add kafka09 support
for service and cluster job spec communication
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 8f32ab4c1 -> 0ee3cfdca
[GOBBLIN-292] Add kafka09 support for service and cluster job spec communication
Add pluggable consumer client to SimpleKafkaSpecConsumer.
Add pluggable producer to SimpleKafkaSpecProducer.
Move common logic into the gobblin-service-kafka module.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/590b872b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/590b872b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/590b872b
Branch: refs/heads/master
Commit: 590b872b094b5a27b2a12e2bc66d1a7514309bb5
Parents: 626d312
Author: Hung Tran <hu...@linkedin.com>
Authored: Thu Oct 19 13:09:35 2017 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Oct 19 13:09:35 2017 -0700
----------------------------------------------------------------------
gobblin-modules/gobblin-kafka-08/build.gradle | 1 +
.../service/SimpleKafkaSpecConsumer.java | 264 -----------------
.../service/SimpleKafkaSpecExecutor.java | 105 -------
.../service/SimpleKafkaSpecProducer.java | 140 ---------
.../service/StreamingKafkaSpecConsumer.java | 173 -----------
.../kafka/client/Kafka09ConsumerClient.java | 19 +-
.../client/AbstractBaseKafkaConsumerClient.java | 3 +-
.../gobblin-service-kafka/build.gradle | 44 +++
.../service/AvroJobSpecDeserializer.java | 70 +++++
.../service/SimpleKafkaSpecConsumer.java | 287 +++++++++++++++++++
.../service/SimpleKafkaSpecExecutor.java | 102 +++++++
.../service/SimpleKafkaSpecProducer.java | 157 ++++++++++
.../service/StreamingKafkaSpecConsumer.java | 175 +++++++++++
13 files changed, 855 insertions(+), 685 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-kafka-08/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/build.gradle b/gobblin-modules/gobblin-kafka-08/build.gradle
index b72ce3a..bc20884 100644
--- a/gobblin-modules/gobblin-kafka-08/build.gradle
+++ b/gobblin-modules/gobblin-kafka-08/build.gradle
@@ -56,6 +56,7 @@ dependencies {
runtime externalDependency.confluentSchemaRegistryClient
runtime externalDependency.protobuf
+ testCompile project(":gobblin-modules:gobblin-service-kafka")
testCompile project(":gobblin-runtime")
testCompile project(":gobblin-test-utils")
testCompile externalDependency.jsonAssert
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java
deleted file mode 100644
index 083ccf3..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service;
-
-import java.io.ByteArrayInputStream;
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.Future;
-import java.util.regex.Pattern;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
-
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.Decoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.kafka.client.ByteArrayBasedKafkaRecord;
-import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
-import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
-import org.apache.gobblin.kafka.client.Kafka08ConsumerClient;
-import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
-import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
-import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecConsumer;
-import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
-import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
-import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
-import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
-import org.apache.gobblin.util.CompletedFuture;
-import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.*;
-
-import lombok.extern.slf4j.Slf4j;
-
-
-@Slf4j
-public class SimpleKafkaSpecConsumer implements SpecConsumer<Spec>, Closeable {
-
- // Consumer
- protected final GobblinKafkaConsumerClient _kafka08Consumer;
- protected final List<KafkaPartition> _partitions;
- protected final List<Long> _lowWatermark;
- protected final List<Long> _nextWatermark;
- protected final List<Long> _highWatermark;
-
- private Iterator<KafkaConsumerRecord> messageIterator = null;
- private int currentPartitionIdx = -1;
- private boolean isFirstRun = true;
-
- private final BinaryDecoder _decoder;
- private final SpecificDatumReader<AvroJobSpec> _reader;
- private final SchemaVersionWriter<?> _versionWriter;
-
- public SimpleKafkaSpecConsumer(Config config, Optional<Logger> log) {
-
- // Consumer
- _kafka08Consumer = new Kafka08ConsumerClient.Factory().create(config);
- List<KafkaTopic> kafkaTopics = _kafka08Consumer.getFilteredTopics(Collections.EMPTY_LIST,
- Lists.newArrayList(Pattern.compile(config.getString(SimpleKafkaSpecExecutor.SPEC_KAFKA_TOPICS_KEY))));
- _partitions = kafkaTopics.get(0).getPartitions();
- _lowWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L));
- _nextWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L));
- _highWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L));
-
- InputStream dummyInputStream = new ByteArrayInputStream(new byte[0]);
- _decoder = DecoderFactory.get().binaryDecoder(dummyInputStream, null);
- _reader = new SpecificDatumReader<AvroJobSpec>(AvroJobSpec.SCHEMA$);
- _versionWriter = new FixedSchemaVersionWriter();
- }
-
- public SimpleKafkaSpecConsumer(Config config, Logger log) {
- this(config, Optional.of(log));
- }
-
- /** Constructor with no logging */
- public SimpleKafkaSpecConsumer(Config config) {
- this(config, Optional.<Logger>absent());
- }
-
- @Override
- public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() {
- List<Pair<SpecExecutor.Verb, Spec>> changesSpecs = new ArrayList<>();
- initializeWatermarks();
- this.currentPartitionIdx = -1;
- while (!allPartitionsFinished()) {
- if (currentPartitionFinished()) {
- moveToNextPartition();
- continue;
- }
- if (this.messageIterator == null || !this.messageIterator.hasNext()) {
- try {
- this.messageIterator = fetchNextMessageBuffer();
- } catch (Exception e) {
- log.error(String.format("Failed to fetch next message buffer for partition %s. Will skip this partition.",
- getCurrentPartition()), e);
- moveToNextPartition();
- continue;
- }
- if (this.messageIterator == null || !this.messageIterator.hasNext()) {
- moveToNextPartition();
- continue;
- }
- }
- while (!currentPartitionFinished()) {
- if (!this.messageIterator.hasNext()) {
- break;
- }
-
- KafkaConsumerRecord nextValidMessage = this.messageIterator.next();
-
- // Even though we ask Kafka to give us a message buffer starting from offset x, it may
- // return a buffer that starts from offset smaller than x, so we need to skip messages
- // until we get to x.
- if (nextValidMessage.getOffset() < _nextWatermark.get(this.currentPartitionIdx)) {
- continue;
- }
-
- _nextWatermark.set(this.currentPartitionIdx, nextValidMessage.getNextOffset());
- try {
- final AvroJobSpec record;
-
- if (nextValidMessage instanceof ByteArrayBasedKafkaRecord) {
- record = decodeRecord((ByteArrayBasedKafkaRecord)nextValidMessage);
- } else if (nextValidMessage instanceof DecodeableKafkaRecord){
- record = ((DecodeableKafkaRecord<?, AvroJobSpec>) nextValidMessage).getValue();
- } else {
- throw new IllegalStateException(
- "Unsupported KafkaConsumerRecord type. The returned record can either be ByteArrayBasedKafkaRecord"
- + " or DecodeableKafkaRecord");
- }
-
- JobSpec.Builder jobSpecBuilder = JobSpec.builder(record.getUri());
-
- Properties props = new Properties();
- props.putAll(record.getProperties());
- jobSpecBuilder.withJobCatalogURI(record.getUri()).withVersion(record.getVersion())
- .withDescription(record.getDescription()).withConfigAsProperties(props);
-
- if (!record.getTemplateUri().isEmpty()) {
- jobSpecBuilder.withTemplate(new URI(record.getTemplateUri()));
- }
-
- String verbName = record.getMetadata().get(VERB_KEY);
- Verb verb = Verb.valueOf(verbName);
-
- changesSpecs.add(new ImmutablePair<Verb, Spec>(verb, jobSpecBuilder.build()));
- } catch (Throwable t) {
- log.error("Could not decode record at partition " + this.currentPartitionIdx +
- " offset " + nextValidMessage.getOffset());
- }
- }
- }
-
- return new CompletedFuture(changesSpecs, null);
- }
-
- private void initializeWatermarks() {
- initializeLowWatermarks();
- initializeHighWatermarks();
- }
-
- private void initializeLowWatermarks() {
- try {
- int i=0;
- for (KafkaPartition kafkaPartition : _partitions) {
- if (isFirstRun) {
- long earliestOffset = _kafka08Consumer.getEarliestOffset(kafkaPartition);
- _lowWatermark.set(i, earliestOffset);
- } else {
- _lowWatermark.set(i, _highWatermark.get(i));
- }
- i++;
- }
- isFirstRun = false;
- } catch (KafkaOffsetRetrievalFailureException e) {
- throw new RuntimeException(e);
- }
- }
-
- private void initializeHighWatermarks() {
- try {
- int i=0;
- for (KafkaPartition kafkaPartition : _partitions) {
- long latestOffset = _kafka08Consumer.getLatestOffset(kafkaPartition);
- _highWatermark.set(i, latestOffset);
- i++;
- }
- } catch (KafkaOffsetRetrievalFailureException e) {
- throw new RuntimeException(e);
- }
- }
-
- private boolean allPartitionsFinished() {
- return this.currentPartitionIdx >= _nextWatermark.size();
- }
-
- private boolean currentPartitionFinished() {
- if (this.currentPartitionIdx == -1) {
- return true;
- } else if (_nextWatermark.get(this.currentPartitionIdx) >= _highWatermark.get(this.currentPartitionIdx)) {
- return true;
- } else {
- return false;
- }
- }
-
- private int moveToNextPartition() {
- this.messageIterator = null;
- return this.currentPartitionIdx ++;
- }
-
- private KafkaPartition getCurrentPartition() {
- return _partitions.get(this.currentPartitionIdx);
- }
-
- private Iterator<KafkaConsumerRecord> fetchNextMessageBuffer() {
- return _kafka08Consumer.consume(_partitions.get(this.currentPartitionIdx),
- _nextWatermark.get(this.currentPartitionIdx), _highWatermark.get(this.currentPartitionIdx));
- }
-
- private AvroJobSpec decodeRecord(ByteArrayBasedKafkaRecord kafkaConsumerRecord) throws IOException {
- InputStream is = new ByteArrayInputStream(kafkaConsumerRecord.getMessageBytes());
- _versionWriter.readSchemaVersioningInformation(new DataInputStream(is));
-
- Decoder decoder = DecoderFactory.get().binaryDecoder(is, _decoder);
-
- return _reader.read(null, decoder);
- }
-
- @Override
- public void close() throws IOException {
- _kafka08Consumer.close();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
deleted file mode 100644
index 8545bf6..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service;
-
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.concurrent.Future;
-
-import com.google.common.base.Optional;
-import com.typesafe.config.Config;
-import com.google.common.io.Closer;
-
-import org.slf4j.Logger;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.util.CompletedFuture;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.api.SpecConsumer;
-import org.apache.gobblin.runtime.api.SpecProducer;
-import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
-
-/**
- * An {@link SpecExecutor} that use Kafka as the communication mechanism.
- */
-public class SimpleKafkaSpecExecutor extends AbstractSpecExecutor {
- public static final String SPEC_KAFKA_TOPICS_KEY = "spec.kafka.topics";
-
-
- protected static final String VERB_KEY = "Verb";
-
- private SpecProducer<Spec> specProducer;
-
- public SimpleKafkaSpecExecutor(Config config, Optional<Logger> log) {
- super(config, log);
- specProducer = new SimpleKafkaSpecProducer(config, log);
- }
-
- /**
- * Constructor with no logging, necessary for simple use case.
- * @param config
- */
- public SimpleKafkaSpecExecutor(Config config) {
- this(config, Optional.absent());
- }
-
- @Override
- public Future<? extends SpecProducer> getProducer() {
- return new CompletedFuture<>(this.specProducer, null);
- }
-
- @Override
- public Future<String> getDescription() {
- return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + specExecutorInstanceUri, null);
- }
-
- @Override
- protected void startUp() throws Exception {
- optionalCloser = Optional.of(Closer.create());
- specProducer = optionalCloser.get().register((SimpleKafkaSpecProducer) specProducer);
- }
-
- @Override
- protected void shutDown() throws Exception {
- if (optionalCloser.isPresent()) {
- optionalCloser.get().close();
- } else {
- log.warn("There's no Closer existed in " + this.getClass().getName());
- }
- }
-
- public static class SpecExecutorInstanceDataPacket implements Serializable {
-
- protected Verb _verb;
- protected URI _uri;
- protected Spec _spec;
-
- public SpecExecutorInstanceDataPacket(Verb verb, URI uri, Spec spec) {
- _verb = verb;
- _uri = uri;
- _spec = spec;
- }
-
- @Override
- public String toString() {
- return String.format("Verb: %s, URI: %s, Spec: %s", _verb, _uri, _spec);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
deleted file mode 100644
index 13aae6f..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-import java.util.concurrent.Future;
-import javax.annotation.concurrent.NotThreadSafe;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-
-import org.slf4j.Logger;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.api.SpecProducer;
-import org.apache.gobblin.kafka.writer.Kafka08DataWriter;
-import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
-import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
-import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.writer.WriteCallback;
-import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.*;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-@NotThreadSafe
-public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable {
-
- // Producer
- protected Kafka08DataWriter<byte[]> _kafka08Producer;
- private final AvroSerializer<AvroJobSpec> _serializer;
- private Config _config;
-
- public SimpleKafkaSpecProducer(Config config, Optional<Logger> log) {
-
- try {
- _serializer = new AvroBinarySerializer<>(AvroJobSpec.SCHEMA$, new FixedSchemaVersionWriter());
- _config = config;
- } catch (IOException e) {
- throw new RuntimeException("Could not create AvroBinarySerializer", e);
- }
- }
-
- public SimpleKafkaSpecProducer(Config config, Logger log) {
- this(config, Optional.of(log));
- }
-
- /** Constructor with no logging */
- public SimpleKafkaSpecProducer(Config config) {
- this(config, Optional.<Logger>absent());
- }
-
- @Override
- public Future<?> addSpec(Spec addedSpec) {
- AvroJobSpec avroJobSpec = convertToAvroJobSpec(addedSpec, SpecExecutor.Verb.ADD);
-
- log.info("Adding Spec: " + addedSpec + " using Kafka.");
-
- return getKafka08Producer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY);
- }
-
- @Override
- public Future<?> updateSpec(Spec updatedSpec) {
- AvroJobSpec avroJobSpec = convertToAvroJobSpec(updatedSpec, SpecExecutor.Verb.UPDATE);
-
- log.info("Updating Spec: " + updatedSpec + " using Kafka.");
-
- return getKafka08Producer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY);
- }
-
- @Override
- public Future<?> deleteSpec(URI deletedSpecURI) {
-
- AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
- .setMetadata(ImmutableMap.of(VERB_KEY, Verb.DELETE.name())).build();
-
- log.info("Deleting Spec: " + deletedSpecURI + " using Kafka.");
-
- return getKafka08Producer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY);
- }
-
- @Override
- public Future<? extends List<Spec>> listSpecs() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() throws IOException {
- _kafka08Producer.close();
- }
-
- private Kafka08DataWriter<byte[]> getKafka08Producer() {
- if (null == _kafka08Producer) {
- _kafka08Producer = new Kafka08DataWriter<byte[]>(ConfigUtils.configToProperties(_config));
- }
- return _kafka08Producer;
- }
-
- private AvroJobSpec convertToAvroJobSpec(Spec spec, Verb verb) {
- if (spec instanceof JobSpec) {
- JobSpec jobSpec = (JobSpec) spec;
- AvroJobSpec.Builder avroJobSpecBuilder = AvroJobSpec.newBuilder();
-
- avroJobSpecBuilder.setUri(jobSpec.getUri().toString()).setVersion(jobSpec.getVersion())
- .setDescription(jobSpec.getDescription()).setProperties(Maps.fromProperties(jobSpec.getConfigAsProperties()))
- .setMetadata(ImmutableMap.of(VERB_KEY, verb.name()));
-
- if (jobSpec.getTemplateURI().isPresent()) {
- avroJobSpecBuilder.setTemplateUri(jobSpec.getTemplateURI().get().toString());
- }
-
- return avroJobSpecBuilder.build();
- } else {
- throw new RuntimeException("Unsupported spec type " + spec.getClass());
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
deleted file mode 100644
index fd42211..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.google.common.util.concurrent.AbstractIdleService;
-
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.MutableJobCatalog;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecConsumer;
-import org.apache.gobblin.runtime.job_monitor.AvroJobSpecKafkaJobMonitor;
-import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor;
-import org.apache.gobblin.runtime.std.DefaultJobCatalogListenerImpl;
-import org.apache.gobblin.util.CompletedFuture;
-import org.apache.gobblin.util.ConfigUtils;
-import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.*;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-/**
- * SpecConsumer that consumes from kafka in a streaming manner
- * Implemented {@link AbstractIdleService} for starting up and shutting down.
- */
-public class StreamingKafkaSpecConsumer extends AbstractIdleService implements SpecConsumer<Spec>, Closeable {
- public static final String SPEC_STREAMING_BLOCKING_QUEUE_SIZE = "spec.StreamingBlockingQueueSize";
- private static final int DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE = 100;
- private final AvroJobSpecKafkaJobMonitor _jobMonitor;
- private final BlockingQueue<ImmutablePair<Verb, Spec>> _jobSpecQueue;
-
- public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Optional<Logger> log) {
- String topic = config.getString(SPEC_KAFKA_TOPICS_KEY);
- Config defaults = ConfigFactory.parseMap(ImmutableMap.of(AvroJobSpecKafkaJobMonitor.TOPIC_KEY, topic,
- KafkaJobMonitor.KAFKA_AUTO_OFFSET_RESET_KEY, KafkaJobMonitor.KAFKA_AUTO_OFFSET_RESET_SMALLEST));
-
- try {
- _jobMonitor = (AvroJobSpecKafkaJobMonitor)(new AvroJobSpecKafkaJobMonitor.Factory())
- .forConfig(config.withFallback(defaults), jobCatalog);
- } catch (IOException e) {
- throw new RuntimeException("Could not create job monitor", e);
- }
-
- _jobSpecQueue = new LinkedBlockingQueue<>(ConfigUtils.getInt(config, "SPEC_STREAMING_BLOCKING_QUEUE_SIZE",
- DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE));
-
- // listener will add job specs to a blocking queue to send to callers of changedSpecs()
- jobCatalog.addListener(new JobSpecListener());
- }
-
- public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Logger log) {
- this(config, jobCatalog, Optional.of(log));
- }
-
- /** Constructor with no logging */
- public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog) {
- this(config, jobCatalog, Optional.<Logger>absent());
- }
-
- /**
- * This method returns job specs receive from Kafka. It will block if there are no job specs.
- * @return list of (verb, jobspecs) pairs.
- */
- @Override
- public Future<? extends List<Pair<Verb, Spec>>> changedSpecs() {
- List<Pair<Verb, Spec>> changesSpecs = new ArrayList<>();
-
- try {
- Pair<Verb, Spec> specPair = _jobSpecQueue.take();
-
- do {
- changesSpecs.add(specPair);
-
- // if there are more elements then pass them along in this call
- specPair = _jobSpecQueue.poll();
- } while (specPair != null);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
-
- return new CompletedFuture(changesSpecs, null);
- }
-
- @Override
- protected void startUp() {
- _jobMonitor.startAsync().awaitRunning();
- }
-
- @Override
- protected void shutDown() {
- _jobMonitor.stopAsync().awaitTerminated();
- }
-
- @Override
- public void close() throws IOException {
- shutDown();
- }
-
- /**
- * JobCatalog listener that puts messages into a blocking queue for consumption by changedSpecs method of
- * {@link StreamingKafkaSpecConsumer}
- */
- protected class JobSpecListener extends DefaultJobCatalogListenerImpl {
- public JobSpecListener() {
- super(StreamingKafkaSpecConsumer.this.log);
- }
-
- @Override public void onAddJob(JobSpec addedJob) {
- super.onAddJob(addedJob);
-
- try {
- _jobSpecQueue.put(new ImmutablePair<Verb, Spec>(Verb.ADD, addedJob));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- @Override public void onDeleteJob(URI deletedJobURI, String deletedJobVersion) {
- super.onDeleteJob(deletedJobURI, deletedJobVersion);
- try {
- JobSpec.Builder jobSpecBuilder = JobSpec.builder(deletedJobURI);
-
- Properties props = new Properties();
- jobSpecBuilder.withVersion(deletedJobVersion).withConfigAsProperties(props);
-
- _jobSpecQueue.put(new ImmutablePair<Verb, Spec>(Verb.DELETE, jobSpecBuilder.build()));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- @Override public void onUpdateJob(JobSpec updatedJob) {
- super.onUpdateJob(updatedJob);
-
- try {
- _jobSpecQueue.put(new ImmutablePair<Verb, Spec>(Verb.UPDATE, updatedJob));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
index b6cd35d..9b952ab 100644
--- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
+++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.kafka.client;
import java.io.IOException;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
@@ -81,6 +82,9 @@ public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient
super(config);
Preconditions.checkArgument(config.hasPath(GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY),
"Missing required property " + GOBBLIN_CONFIG_KEY_DESERIALIZER_CLASS_KEY);
+
+ Config scopedConfig = ConfigUtils.getConfigOrEmpty(config, AbstractBaseKafkaConsumerClient.CONFIG_PREFIX_NO_DOT);
+
Properties props = new Properties();
props.put(KAFKA_09_CLIENT_BOOTSTRAP_SERVERS_KEY, Joiner.on(",").join(super.brokers));
props.put(KAFKA_09_CLIENT_ENABLE_AUTO_COMMIT_KEY, KAFKA_09_DEFAULT_ENABLE_AUTO_COMMIT);
@@ -89,6 +93,9 @@ public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient
ConfigUtils.getString(config, GOBBLIN_CONFIG_KEY_DESERIALIZER_CLASS_KEY, KAFKA_09_DEFAULT_KEY_DESERIALIZER));
props.put(KAFKA_09_CLIENT_VALUE_DESERIALIZER_CLASS_KEY,
config.getString(GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY));
+
+ props.putAll(ConfigUtils.configToProperties(scopedConfig));
+
this.consumer = new KafkaConsumer<>(props);
}
@@ -112,12 +119,20 @@ public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient
@Override
public long getEarliestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
- throw new UnsupportedOperationException("getEarliestOffset and getLatestOffset is not supported by Kafka-09");
+ TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
+ this.consumer.assign(Collections.singletonList(topicPartition));
+ this.consumer.seekToBeginning(topicPartition);
+
+ return this.consumer.position(topicPartition);
}
@Override
public long getLatestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
- throw new UnsupportedOperationException("getEarliestOffset and getLatestOffset is not supported by Kafka-09");
+ TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
+ this.consumer.assign(Collections.singletonList(topicPartition));
+ this.consumer.seekToEnd(topicPartition);
+
+ return this.consumer.position(topicPartition);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
index 00b751b..638a879 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
@@ -41,7 +41,8 @@ import javax.annotation.Nullable;
*/
public abstract class AbstractBaseKafkaConsumerClient implements GobblinKafkaConsumerClient {
- public static final String CONFIG_PREFIX = "source.kafka.";
+ public static final String CONFIG_PREFIX_NO_DOT = "source.kafka";
+ public static final String CONFIG_PREFIX = CONFIG_PREFIX_NO_DOT + ".";
public static final String CONFIG_KAFKA_FETCH_TIMEOUT_VALUE = CONFIG_PREFIX + "fetchTimeoutMillis";
public static final int CONFIG_KAFKA_FETCH_TIMEOUT_VALUE_DEFAULT = 1000; // 1 second
public static final String CONFIG_KAFKA_FETCH_REQUEST_MIN_BYTES = CONFIG_PREFIX + "fetchMinBytes";
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-service-kafka/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-service-kafka/build.gradle b/gobblin-modules/gobblin-service-kafka/build.gradle
new file mode 100644
index 0000000..e4d1146
--- /dev/null
+++ b/gobblin-modules/gobblin-service-kafka/build.gradle
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'java'
+
+dependencies {
+ compile project(":gobblin-runtime")
+
+ compile externalDependency.avro
+ compile externalDependency.slf4j
+ compile externalDependency.lombok
+ compile externalDependency.typesafeConfig
+
+ testCompile externalDependency.testng
+}
+
+configurations {
+ compile { transitive = false }
+ // Remove xerces dependencies because of versioning issues. Standard JRE implementation should
+ // work. See also http://stackoverflow.com/questions/11677572/dealing-with-xerces-hell-in-java-maven
+ // HADOOP-5254 and MAPREDUCE-5664
+ all*.exclude group: 'xml-apis'
+ all*.exclude group: 'xerces'
+}
+
+test {
+ workingDir rootProject.rootDir
+}
+
+ext.classification="library"
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java
new file mode 100644
index 0000000..879dea4
--- /dev/null
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
+import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
+import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+/**
+ * A deserializer that converts a byte array into an {@link AvroJobSpec}
+ */
+public class AvroJobSpecDeserializer implements Deserializer<AvroJobSpec> {
+ private BinaryDecoder _decoder;
+ private SpecificDatumReader<AvroJobSpec> _reader;
+ private SchemaVersionWriter<?> _versionWriter;
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ InputStream dummyInputStream = new ByteArrayInputStream(new byte[0]);
+ _decoder = DecoderFactory.get().binaryDecoder(dummyInputStream, null);
+ _reader = new SpecificDatumReader<AvroJobSpec>(AvroJobSpec.SCHEMA$);
+ _versionWriter = new FixedSchemaVersionWriter();
+ }
+
+ @Override
+ public AvroJobSpec deserialize(String topic, byte[] data) {
+ try (InputStream is = new ByteArrayInputStream(data)) {
+ _versionWriter.readSchemaVersioningInformation(new DataInputStream(is));
+
+ Decoder decoder = DecoderFactory.get().binaryDecoder(is, _decoder);
+
+ return _reader.read(null, decoder);
+ } catch (IOException e) {
+ throw new RuntimeException("Could not decode message");
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java
new file mode 100644
index 0000000..139d204
--- /dev/null
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.regex.Pattern;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.kafka.client.ByteArrayBasedKafkaRecord;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
+import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
+import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
+import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecConsumer;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.VERB_KEY;
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class SimpleKafkaSpecConsumer implements SpecConsumer<Spec>, Closeable {
+ private static final String CONSUMER_CLIENT_FACTORY_CLASS_KEY = "spec.kafka.consumerClientClassFactory";
+ private static final String DEFAULT_CONSUMER_CLIENT_FACTORY_CLASS =
+ "org.apache.gobblin.kafka.client.Kafka08ConsumerClient$Factory";
+
+ // Consumer
+ protected final GobblinKafkaConsumerClient _kafkaConsumer;
+ protected final List<KafkaPartition> _partitions;
+ protected final List<Long> _lowWatermark;
+ protected final List<Long> _nextWatermark;
+ protected final List<Long> _highWatermark;
+
+ private Iterator<KafkaConsumerRecord> messageIterator = null;
+ private int currentPartitionIdx = -1;
+ private boolean isFirstRun = true;
+
+ private final BinaryDecoder _decoder;
+ private final SpecificDatumReader<AvroJobSpec> _reader;
+ private final SchemaVersionWriter<?> _versionWriter;
+
+ public SimpleKafkaSpecConsumer(Config config, Optional<Logger> log) {
+
+ // Consumer
+ String kafkaConsumerClientClass = ConfigUtils.getString(config, CONSUMER_CLIENT_FACTORY_CLASS_KEY,
+ DEFAULT_CONSUMER_CLIENT_FACTORY_CLASS);
+
+ try {
+ Class<?> clientFactoryClass = (Class<?>) Class.forName(kafkaConsumerClientClass);
+ final GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory factory =
+ (GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory)
+ ConstructorUtils.invokeConstructor(clientFactoryClass);
+
+ _kafkaConsumer = factory.create(config);
+ } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
+ if (log.isPresent()) {
+ log.get().error("Failed to instantiate Kafka consumer from class " + kafkaConsumerClientClass, e);
+ }
+
+ throw new RuntimeException("Failed to instantiate Kafka consumer", e);
+ }
+
+ List<KafkaTopic> kafkaTopics = _kafkaConsumer.getFilteredTopics(Collections.EMPTY_LIST,
+ Lists.newArrayList(Pattern.compile(config.getString(SimpleKafkaSpecExecutor.SPEC_KAFKA_TOPICS_KEY))));
+ _partitions = kafkaTopics.get(0).getPartitions();
+ _lowWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L));
+ _nextWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L));
+ _highWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L));
+
+ InputStream dummyInputStream = new ByteArrayInputStream(new byte[0]);
+ _decoder = DecoderFactory.get().binaryDecoder(dummyInputStream, null);
+ _reader = new SpecificDatumReader<AvroJobSpec>(AvroJobSpec.SCHEMA$);
+ _versionWriter = new FixedSchemaVersionWriter();
+ }
+
+ public SimpleKafkaSpecConsumer(Config config, Logger log) {
+ this(config, Optional.of(log));
+ }
+
+ /** Constructor with no logging */
+ public SimpleKafkaSpecConsumer(Config config) {
+ this(config, Optional.<Logger>absent());
+ }
+
+ @Override
+ public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() {
+ List<Pair<SpecExecutor.Verb, Spec>> changesSpecs = new ArrayList<>();
+ initializeWatermarks();
+ this.currentPartitionIdx = -1;
+ while (!allPartitionsFinished()) {
+ if (currentPartitionFinished()) {
+ moveToNextPartition();
+ continue;
+ }
+ if (this.messageIterator == null || !this.messageIterator.hasNext()) {
+ try {
+ this.messageIterator = fetchNextMessageBuffer();
+ } catch (Exception e) {
+ log.error(String.format("Failed to fetch next message buffer for partition %s. Will skip this partition.",
+ getCurrentPartition()), e);
+ moveToNextPartition();
+ continue;
+ }
+ if (this.messageIterator == null || !this.messageIterator.hasNext()) {
+ moveToNextPartition();
+ continue;
+ }
+ }
+ while (!currentPartitionFinished()) {
+ if (!this.messageIterator.hasNext()) {
+ break;
+ }
+
+ KafkaConsumerRecord nextValidMessage = this.messageIterator.next();
+
+ // Even though we ask Kafka to give us a message buffer starting from offset x, it may
+ // return a buffer that starts from offset smaller than x, so we need to skip messages
+ // until we get to x.
+ if (nextValidMessage.getOffset() < _nextWatermark.get(this.currentPartitionIdx)) {
+ continue;
+ }
+
+ _nextWatermark.set(this.currentPartitionIdx, nextValidMessage.getNextOffset());
+ try {
+ final AvroJobSpec record;
+
+ if (nextValidMessage instanceof ByteArrayBasedKafkaRecord) {
+ record = decodeRecord((ByteArrayBasedKafkaRecord)nextValidMessage);
+ } else if (nextValidMessage instanceof DecodeableKafkaRecord){
+ record = ((DecodeableKafkaRecord<?, AvroJobSpec>) nextValidMessage).getValue();
+ } else {
+ throw new IllegalStateException(
+ "Unsupported KafkaConsumerRecord type. The returned record can either be ByteArrayBasedKafkaRecord"
+ + " or DecodeableKafkaRecord");
+ }
+
+ JobSpec.Builder jobSpecBuilder = JobSpec.builder(record.getUri());
+
+ Properties props = new Properties();
+ props.putAll(record.getProperties());
+ jobSpecBuilder.withJobCatalogURI(record.getUri()).withVersion(record.getVersion())
+ .withDescription(record.getDescription()).withConfigAsProperties(props);
+
+ if (!record.getTemplateUri().isEmpty()) {
+ jobSpecBuilder.withTemplate(new URI(record.getTemplateUri()));
+ }
+
+ String verbName = record.getMetadata().get(VERB_KEY);
+ SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName);
+
+ changesSpecs.add(new ImmutablePair<SpecExecutor.Verb, Spec>(verb, jobSpecBuilder.build()));
+ } catch (Throwable t) {
+ log.error("Could not decode record at partition " + this.currentPartitionIdx +
+ " offset " + nextValidMessage.getOffset());
+ }
+ }
+ }
+
+ return new CompletedFuture(changesSpecs, null);
+ }
+
+ private void initializeWatermarks() {
+ initializeLowWatermarks();
+ initializeHighWatermarks();
+ }
+
+ private void initializeLowWatermarks() {
+ try {
+ int i=0;
+ for (KafkaPartition kafkaPartition : _partitions) {
+ if (isFirstRun) {
+ long earliestOffset = _kafkaConsumer.getEarliestOffset(kafkaPartition);
+ _lowWatermark.set(i, earliestOffset);
+ } else {
+ _lowWatermark.set(i, _highWatermark.get(i));
+ }
+ i++;
+ }
+ isFirstRun = false;
+ } catch (KafkaOffsetRetrievalFailureException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void initializeHighWatermarks() {
+ try {
+ int i=0;
+ for (KafkaPartition kafkaPartition : _partitions) {
+ long latestOffset = _kafkaConsumer.getLatestOffset(kafkaPartition);
+ _highWatermark.set(i, latestOffset);
+ i++;
+ }
+ } catch (KafkaOffsetRetrievalFailureException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private boolean allPartitionsFinished() {
+ return this.currentPartitionIdx >= _nextWatermark.size();
+ }
+
+ private boolean currentPartitionFinished() {
+ if (this.currentPartitionIdx == -1) {
+ return true;
+ } else if (_nextWatermark.get(this.currentPartitionIdx) >= _highWatermark.get(this.currentPartitionIdx)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private int moveToNextPartition() {
+ this.messageIterator = null;
+ return this.currentPartitionIdx ++;
+ }
+
+ private KafkaPartition getCurrentPartition() {
+ return _partitions.get(this.currentPartitionIdx);
+ }
+
+ private Iterator<KafkaConsumerRecord> fetchNextMessageBuffer() {
+ return _kafkaConsumer.consume(_partitions.get(this.currentPartitionIdx),
+ _nextWatermark.get(this.currentPartitionIdx), _highWatermark.get(this.currentPartitionIdx));
+ }
+
+ private AvroJobSpec decodeRecord(ByteArrayBasedKafkaRecord kafkaConsumerRecord) throws IOException {
+ InputStream is = new ByteArrayInputStream(kafkaConsumerRecord.getMessageBytes());
+ _versionWriter.readSchemaVersioningInformation(new DataInputStream(is));
+
+ Decoder decoder = DecoderFactory.get().binaryDecoder(is, _decoder);
+
+ return _reader.read(null, decoder);
+ }
+
+ @Override
+ public void close() throws IOException {
+ _kafkaConsumer.close();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
new file mode 100644
index 0000000..c3dfcb3
--- /dev/null
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.concurrent.Future;
+
+import org.slf4j.Logger;
+
+import com.google.common.base.Optional;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
+import org.apache.gobblin.util.CompletedFuture;
+
+/**
+ * An {@link SpecExecutor} that use Kafka as the communication mechanism.
+ */
+public class SimpleKafkaSpecExecutor extends AbstractSpecExecutor {
+ public static final String SPEC_KAFKA_TOPICS_KEY = "spec.kafka.topics";
+
+
+ protected static final String VERB_KEY = "Verb";
+
+ private SpecProducer<Spec> specProducer;
+
+ public SimpleKafkaSpecExecutor(Config config, Optional<Logger> log) {
+ super(config, log);
+ specProducer = new SimpleKafkaSpecProducer(config, log);
+ }
+
+ /**
+ * Constructor with no logging, necessary for simple use case.
+ * @param config
+ */
+ public SimpleKafkaSpecExecutor(Config config) {
+ this(config, Optional.absent());
+ }
+
+ @Override
+ public Future<? extends SpecProducer> getProducer() {
+ return new CompletedFuture<>(this.specProducer, null);
+ }
+
+ @Override
+ public Future<String> getDescription() {
+ return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + specExecutorInstanceUri, null);
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ optionalCloser = Optional.of(Closer.create());
+ specProducer = optionalCloser.get().register((SimpleKafkaSpecProducer) specProducer);
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ if (optionalCloser.isPresent()) {
+ optionalCloser.get().close();
+ } else {
+ log.warn("There's no Closer existed in " + this.getClass().getName());
+ }
+ }
+
+ public static class SpecExecutorInstanceDataPacket implements Serializable {
+
+ protected SpecExecutor.Verb _verb;
+ protected URI _uri;
+ protected Spec _spec;
+
+ public SpecExecutorInstanceDataPacket(SpecExecutor.Verb verb, URI uri, Spec spec) {
+ _verb = verb;
+ _uri = uri;
+ _spec = spec;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Verb: %s, URI: %s, Spec: %s", _verb, _uri, _spec);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
new file mode 100644
index 0000000..a5163db
--- /dev/null
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.slf4j.Logger;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
+import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
+import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.AsyncDataWriter;
+import org.apache.gobblin.writer.WriteCallback;
+
+import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.VERB_KEY;
+import javax.annotation.concurrent.NotThreadSafe;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@NotThreadSafe
+public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable {
+ private static final String KAFKA_DATA_WRITER_CLASS_KEY = "spec.kafka.dataWriterClass";
+ private static final String DEFAULT_KAFKA_DATA_WRITER_CLASS =
+ "org.apache.gobblin.kafka.writer.Kafka08DataWriter";
+
+ // Producer
+ protected AsyncDataWriter<byte[]> _kafkaProducer;
+ private final AvroSerializer<AvroJobSpec> _serializer;
+ private Config _config;
+ private final String _kafkaProducerClassName;
+
+ public SimpleKafkaSpecProducer(Config config, Optional<Logger> log) {
+ _kafkaProducerClassName = ConfigUtils.getString(config, KAFKA_DATA_WRITER_CLASS_KEY,
+ DEFAULT_KAFKA_DATA_WRITER_CLASS);
+
+ try {
+ _serializer = new AvroBinarySerializer<>(AvroJobSpec.SCHEMA$, new FixedSchemaVersionWriter());
+ _config = config;
+ } catch (IOException e) {
+ throw new RuntimeException("Could not create AvroBinarySerializer", e);
+ }
+ }
+
+ public SimpleKafkaSpecProducer(Config config, Logger log) {
+ this(config, Optional.of(log));
+ }
+
+ /** Constructor with no logging */
+ public SimpleKafkaSpecProducer(Config config) {
+ this(config, Optional.<Logger>absent());
+ }
+
+ @Override
+ public Future<?> addSpec(Spec addedSpec) {
+ AvroJobSpec avroJobSpec = convertToAvroJobSpec(addedSpec, SpecExecutor.Verb.ADD);
+
+ log.info("Adding Spec: " + addedSpec + " using Kafka.");
+
+ return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY);
+ }
+
+ @Override
+ public Future<?> updateSpec(Spec updatedSpec) {
+ AvroJobSpec avroJobSpec = convertToAvroJobSpec(updatedSpec, SpecExecutor.Verb.UPDATE);
+
+ log.info("Updating Spec: " + updatedSpec + " using Kafka.");
+
+ return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY);
+ }
+
+ @Override
+ public Future<?> deleteSpec(URI deletedSpecURI) {
+
+ AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
+ .setMetadata(ImmutableMap.of(VERB_KEY, SpecExecutor.Verb.DELETE.name())).build();
+
+ log.info("Deleting Spec: " + deletedSpecURI + " using Kafka.");
+
+ return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY);
+ }
+
+ @Override
+ public Future<? extends List<Spec>> listSpecs() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ _kafkaProducer.close();
+ }
+
+ private AsyncDataWriter<byte[]> getKafkaProducer() {
+ if (null == _kafkaProducer) {
+ try {
+ Class<?> kafkaProducerClass = (Class<?>) Class.forName(_kafkaProducerClassName);
+ _kafkaProducer = (AsyncDataWriter<byte[]>) ConstructorUtils.invokeConstructor(kafkaProducerClass,
+ ConfigUtils.configToProperties(_config));
+ } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
+ log.error("Failed to instantiate Kafka consumer from class " + _kafkaProducerClassName, e);
+
+ throw new RuntimeException("Failed to instantiate Kafka consumer", e);
+ }
+ }
+ return _kafkaProducer;
+ }
+
+ private AvroJobSpec convertToAvroJobSpec(Spec spec, SpecExecutor.Verb verb) {
+ if (spec instanceof JobSpec) {
+ JobSpec jobSpec = (JobSpec) spec;
+ AvroJobSpec.Builder avroJobSpecBuilder = AvroJobSpec.newBuilder();
+
+ avroJobSpecBuilder.setUri(jobSpec.getUri().toString()).setVersion(jobSpec.getVersion())
+ .setDescription(jobSpec.getDescription()).setProperties(Maps.fromProperties(jobSpec.getConfigAsProperties()))
+ .setMetadata(ImmutableMap.of(VERB_KEY, verb.name()));
+
+ if (jobSpec.getTemplateURI().isPresent()) {
+ avroJobSpecBuilder.setTemplateUri(jobSpec.getTemplateURI().get().toString());
+ }
+
+ return avroJobSpecBuilder.build();
+ } else {
+ throw new RuntimeException("Unsupported spec type " + spec.getClass());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
new file mode 100644
index 0000000..7d7b702
--- /dev/null
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecConsumer;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.job_monitor.AvroJobSpecKafkaJobMonitor;
+import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor;
+import org.apache.gobblin.runtime.std.DefaultJobCatalogListenerImpl;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.SPEC_KAFKA_TOPICS_KEY;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+/**
+ * SpecConsumer that consumes from kafka in a streaming manner
+ * Implemented {@link AbstractIdleService} for starting up and shutting down.
+ */
+public class StreamingKafkaSpecConsumer extends AbstractIdleService implements SpecConsumer<Spec>, Closeable {
+ public static final String SPEC_STREAMING_BLOCKING_QUEUE_SIZE = "spec.StreamingBlockingQueueSize";
+ private static final int DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE = 100;
+ private final AvroJobSpecKafkaJobMonitor _jobMonitor;
+ private final BlockingQueue<ImmutablePair<SpecExecutor.Verb, Spec>> _jobSpecQueue;
+
+ public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Optional<Logger> log) {
+ String topic = config.getString(SPEC_KAFKA_TOPICS_KEY);
+ Config defaults = ConfigFactory.parseMap(ImmutableMap.of(AvroJobSpecKafkaJobMonitor.TOPIC_KEY, topic,
+ KafkaJobMonitor.KAFKA_AUTO_OFFSET_RESET_KEY, KafkaJobMonitor.KAFKA_AUTO_OFFSET_RESET_SMALLEST));
+
+ try {
+ _jobMonitor = (AvroJobSpecKafkaJobMonitor)(new AvroJobSpecKafkaJobMonitor.Factory())
+ .forConfig(config.withFallback(defaults), jobCatalog);
+ } catch (IOException e) {
+ throw new RuntimeException("Could not create job monitor", e);
+ }
+
+ _jobSpecQueue = new LinkedBlockingQueue<>(ConfigUtils.getInt(config, "SPEC_STREAMING_BLOCKING_QUEUE_SIZE",
+ DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE));
+
+ // listener will add job specs to a blocking queue to send to callers of changedSpecs()
+ jobCatalog.addListener(new JobSpecListener());
+ }
+
+ public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Logger log) {
+ this(config, jobCatalog, Optional.of(log));
+ }
+
+ /** Constructor with no logging */
+ public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog) {
+ this(config, jobCatalog, Optional.<Logger>absent());
+ }
+
+ /**
+ * This method returns job specs receive from Kafka. It will block if there are no job specs.
+ * @return list of (verb, jobspecs) pairs.
+ */
+ @Override
+ public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() {
+ List<Pair<SpecExecutor.Verb, Spec>> changesSpecs = new ArrayList<>();
+
+ try {
+ Pair<SpecExecutor.Verb, Spec> specPair = _jobSpecQueue.take();
+
+ do {
+ changesSpecs.add(specPair);
+
+ // if there are more elements then pass them along in this call
+ specPair = _jobSpecQueue.poll();
+ } while (specPair != null);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ return new CompletedFuture(changesSpecs, null);
+ }
+
+ @Override
+ protected void startUp() {
+ _jobMonitor.startAsync().awaitRunning();
+ }
+
+ @Override
+ protected void shutDown() {
+ _jobMonitor.stopAsync().awaitTerminated();
+ }
+
+ @Override
+ public void close() throws IOException {
+ shutDown();
+ }
+
+ /**
+ * JobCatalog listener that puts messages into a blocking queue for consumption by changedSpecs method of
+ * {@link StreamingKafkaSpecConsumer}
+ */
+ protected class JobSpecListener extends DefaultJobCatalogListenerImpl {
+ public JobSpecListener() {
+ super(StreamingKafkaSpecConsumer.this.log);
+ }
+
+ @Override public void onAddJob(JobSpec addedJob) {
+ super.onAddJob(addedJob);
+
+ try {
+ _jobSpecQueue.put(new ImmutablePair<SpecExecutor.Verb, Spec>(SpecExecutor.Verb.ADD, addedJob));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override public void onDeleteJob(URI deletedJobURI, String deletedJobVersion) {
+ super.onDeleteJob(deletedJobURI, deletedJobVersion);
+ try {
+ JobSpec.Builder jobSpecBuilder = JobSpec.builder(deletedJobURI);
+
+ Properties props = new Properties();
+ jobSpecBuilder.withVersion(deletedJobVersion).withConfigAsProperties(props);
+
+ _jobSpecQueue.put(new ImmutablePair<SpecExecutor.Verb, Spec>(SpecExecutor.Verb.DELETE, jobSpecBuilder.build()));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override public void onUpdateJob(JobSpec updatedJob) {
+ super.onUpdateJob(updatedJob);
+
+ try {
+ _jobSpecQueue.put(new ImmutablePair<SpecExecutor.Verb, Spec>(SpecExecutor.Verb.UPDATE, updatedJob));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+}
\ No newline at end of file
[2/2] incubator-gobblin git commit: Merge pull request #2144 from
htran1/gobblin-service-09
Posted by hu...@apache.org.
Merge pull request #2144 from htran1/gobblin-service-09
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0ee3cfdc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0ee3cfdc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0ee3cfdc
Branch: refs/heads/master
Commit: 0ee3cfdca9dd3edb9d6a45fbffeb186fb10101d1
Parents: 8f32ab4 590b872
Author: Hung Tran <hu...@linkedin.com>
Authored: Thu Oct 19 17:07:32 2017 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Oct 19 17:07:32 2017 -0700
----------------------------------------------------------------------
gobblin-modules/gobblin-kafka-08/build.gradle | 1 +
.../service/SimpleKafkaSpecConsumer.java | 264 -----------------
.../service/SimpleKafkaSpecExecutor.java | 105 -------
.../service/SimpleKafkaSpecProducer.java | 140 ---------
.../service/StreamingKafkaSpecConsumer.java | 173 -----------
.../kafka/client/Kafka09ConsumerClient.java | 19 +-
.../client/AbstractBaseKafkaConsumerClient.java | 3 +-
.../gobblin-service-kafka/build.gradle | 44 +++
.../service/AvroJobSpecDeserializer.java | 70 +++++
.../service/SimpleKafkaSpecConsumer.java | 287 +++++++++++++++++++
.../service/SimpleKafkaSpecExecutor.java | 102 +++++++
.../service/SimpleKafkaSpecProducer.java | 157 ++++++++++
.../service/StreamingKafkaSpecConsumer.java | 175 +++++++++++
13 files changed, 855 insertions(+), 685 deletions(-)
----------------------------------------------------------------------