You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/02/20 14:56:10 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-684] Ensure buffered messages are flushed before close() in K…
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ebbc153 [GOBBLIN-684] Ensure buffered messages are flushed before close() in K…
ebbc153 is described below
commit ebbc153463e01157a562d75a4e5d11b2428715df
Author: suvasude <su...@linkedin.biz>
AuthorDate: Wed Feb 20 06:56:07 2019 -0800
[GOBBLIN-684] Ensure buffered messages are flushed before close() in K…
Closes #2556 from sv2000/kafkaFlush
---
.../metrics/kafka/KafkaKeyValueProducerPusher.java | 54 +++++++++++++++++++++-
.../gobblin/metrics/kafka/KafkaProducerPusher.java | 52 ++++++++++++++++++++-
.../metrics/kafka/KafkaKeyValueProducerPusher.java | 8 +++-
.../gobblin/metrics/kafka/KafkaProducerPusher.java | 4 ++
.../gobblin/runtime/AbstractJobLauncher.java | 23 ++++-----
5 files changed, 121 insertions(+), 20 deletions(-)
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
index e9ea3ab..c269da7 100644
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
+++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
@@ -20,11 +20,15 @@ package org.apache.gobblin.metrics.kafka;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -44,9 +48,21 @@ import org.apache.gobblin.util.ConfigUtils;
*/
@Slf4j
public class KafkaKeyValueProducerPusher<K, V> implements Pusher<Pair<K, V>> {
+ private static final long DEFAULT_MAX_NUM_FUTURES_TO_BUFFER = 1000L;
+ //Low watermark for the size of the futures queue, to trigger flushing of messages.
+ private static final String MAX_NUM_FUTURES_TO_BUFFER_KEY = "numFuturesToBuffer";
+
private final String topic;
private final KafkaProducer<K, V> producer;
private final Closer closer;
+ //Queue to keep track of the futures returned by the Kafka asynchronous send() call. The futures queue is used
+ // to mimic the functionality of flush() call (available in Kafka 09 and later). Currently, there are no
+ // capacity limits on the size of the futures queue. In general, if queue capacity is enforced, a safe lower bound for queue
+ // capacity is MAX_NUM_FUTURES_TO_BUFFER + (numThreads * maxNumMessagesPerInterval), where numThreads equals the number of
+ // threads sharing the producer instance and maxNumMessagesPerInterval is the estimated maximum number of messages
+ // emitted by a thread per reporting interval.
+ private final Queue<Future<RecordMetadata>> futures = new LinkedBlockingDeque<>();
+ private long numFuturesToBuffer=1000L;
public KafkaKeyValueProducerPusher(String brokers, String topic, Optional<Config> kafkaConfig) {
this.closer = Closer.create();
@@ -61,10 +77,12 @@ public class KafkaKeyValueProducerPusher<K, V> implements Pusher<Pair<K, V>> {
props.put(ProducerConfig.RETRIES_CONFIG, 3);
//To guarantee ordered delivery, the maximum in flight requests must be set to 1.
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
+ props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, true);
// add the kafka scoped config. if any of the above are specified then they are overridden
if (kafkaConfig.isPresent()) {
props.putAll(ConfigUtils.configToProperties(kafkaConfig.get()));
+ this.numFuturesToBuffer = ConfigUtils.getLong(kafkaConfig.get(), MAX_NUM_FUTURES_TO_BUFFER_KEY, DEFAULT_MAX_NUM_FUTURES_TO_BUFFER);
}
this.producer = createProducer(props);
@@ -80,17 +98,49 @@ public class KafkaKeyValueProducerPusher<K, V> implements Pusher<Pair<K, V>> {
*/
public void pushMessages(List<Pair<K, V>> messages) {
for (Pair<K, V> message: messages) {
- this.producer.send(new ProducerRecord<>(topic, message.getKey(), message.getValue()), (recordMetadata, e) -> {
+ this.futures.offer(this.producer.send(new ProducerRecord<>(topic, message.getKey(), message.getValue()), (recordMetadata, e) -> {
if (e != null) {
log.error("Failed to send message to topic {} due to exception: ", topic, e);
}
- });
+ }));
+ }
+
+ //Once the low watermark of numFuturesToBuffer is hit, start flushing messages from the futures
+ // buffer. In order to avoid blocking on newest messages added to futures queue, we only invoke future.get() on
+ // the oldest messages in the futures buffer. The number of messages to flush is same as the number of messages added
+ // in the current call. Note this does not completely avoid calling future.get() on the newer messages e.g. when
+ // multiple threads enter the if{} block concurrently, and invoke flush().
+ if (this.futures.size() >= this.numFuturesToBuffer) {
+ flush(messages.size());
+ }
+ }
+
+ /**
+ * Flush any records that may be present in the producer buffer upto a maximum of <code>numRecordsToFlush</code>.
+ * This method is needed since Kafka 0.8 producer does not have a flush() API. In the absence of the flush()
+ * implementation, records which are present in the buffer but not in-flight may not be delivered at all when close()
+ * is called, leading to data loss.
+ * @param numRecordsToFlush
+ */
+ private void flush(long numRecordsToFlush) {
+ log.debug("Flushing records from producer buffer");
+ Future future;
+ long numRecordsFlushed = 0L;
+ while (((future = futures.poll()) != null) && (numRecordsFlushed++ < numRecordsToFlush)) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ log.error("Exception encountered when flushing record", e);
+ }
}
+ log.debug("Flushed {} records from producer buffer", numRecordsFlushed);
}
@Override
public void close()
throws IOException {
+ log.info("Flushing records before close");
+ flush(Long.MAX_VALUE);
this.closer.close();
}
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
index 2b98419..2772e4a 100644
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
+++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
@@ -20,10 +20,14 @@ package org.apache.gobblin.metrics.kafka;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -41,10 +45,21 @@ import org.apache.gobblin.util.ConfigUtils;
*/
@Slf4j
public class KafkaProducerPusher implements Pusher<byte[]> {
+ private static final long DEFAULT_MAX_NUM_FUTURES_TO_BUFFER = 1000L;
+ //Low watermark for the size of the futures queue, to trigger flushing of messages.
+ private static final String MAX_NUM_FUTURES_TO_BUFFER_KEY = "numFuturesToBuffer";
private final String topic;
private final KafkaProducer<String, byte[]> producer;
private final Closer closer;
+ //Queue to keep track of the futures returned by the Kafka asynchronous send() call. The futures queue is used
+ // to mimic the functionality of flush() call (available in Kafka 09 and later). Currently, there are no
+ // capacity limits on the size of the futures queue. In general, if queue capacity is enforced, a safe lower bound for queue
+ // capacity is MAX_NUM_FUTURES_TO_BUFFER + (numThreads * maxNumMessagesPerInterval), where numThreads equals the number of
+ // threads sharing the producer instance and maxNumMessagesPerInterval is the estimated maximum number of messages
+ // emitted by a thread per reporting interval.
+ private final Queue<Future<RecordMetadata>> futures = new LinkedBlockingDeque<>();
+ private long numFuturesToBuffer = 1000L;
public KafkaProducerPusher(String brokers, String topic, Optional<Config> kafkaConfig) {
this.closer = Closer.create();
@@ -61,6 +76,7 @@ public class KafkaProducerPusher implements Pusher<byte[]> {
// add the kafka scoped config. if any of the above are specified then they are overridden
if (kafkaConfig.isPresent()) {
props.putAll(ConfigUtils.configToProperties(kafkaConfig.get()));
+ this.numFuturesToBuffer = ConfigUtils.getLong(kafkaConfig.get(), MAX_NUM_FUTURES_TO_BUFFER_KEY, DEFAULT_MAX_NUM_FUTURES_TO_BUFFER);
}
this.producer = createProducer(props);
@@ -76,17 +92,49 @@ public class KafkaProducerPusher implements Pusher<byte[]> {
*/
public void pushMessages(List<byte[]> messages) {
for (byte[] message: messages) {
- producer.send(new ProducerRecord<>(topic, message), (recordMetadata, e) -> {
+ this.futures.offer(producer.send(new ProducerRecord<>(topic, message), (recordMetadata, e) -> {
if (e != null) {
log.error("Failed to send message to topic {} due to exception: ", topic, e);
}
- });
+ }));
}
+
+ //Once the low watermark of numFuturesToBuffer is hit, start flushing messages from the futures
+ // buffer. In order to avoid blocking on newest messages added to futures queue, we only invoke future.get() on
+ // the oldest messages in the futures buffer. The number of messages to flush is same as the number of messages added
+ // in the current call. Note this does not completely avoid calling future.get() on the newer messages e.g. when
+ // multiple threads enter the if{} block concurrently, and invoke flush().
+ if (this.futures.size() >= this.numFuturesToBuffer) {
+ flush(messages.size());
+ }
+ }
+
+ /**
+ * Flush any records that may be present in the producer buffer upto a maximum of <code>numRecordsToFlush</code>.
+ * This method is needed since Kafka 0.8 producer does not have a flush() API. In the absence of the flush()
+ * implementation, records which are present in the buffer but not in-flight may not be delivered at all when close()
+ * is called, leading to data loss.
+ * @param numRecordsToFlush
+ */
+ private void flush(long numRecordsToFlush) {
+ log.debug("Flushing records from producer buffer");
+ Future future;
+ long numRecordsFlushed = 0L;
+ while (((future = futures.poll()) != null) && (numRecordsFlushed++ < numRecordsToFlush)) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ log.error("Exception encountered when flushing record", e);
+ }
+ }
+ log.debug("Flushed {} records from producer buffer", numRecordsFlushed);
}
@Override
public void close()
throws IOException {
+ log.info("Flushing records before close");
+ flush(Long.MAX_VALUE);
this.closer.close();
}
diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
index e4ad6ca..b411b94 100644
--- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
+++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
@@ -32,10 +32,10 @@ import com.google.common.base.Optional;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
-import org.apache.gobblin.util.ConfigUtils;
-
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.util.ConfigUtils;
+
/**
* Establishes a connection to a Kafka cluster and push keyed messages to a specified topic.
@@ -91,6 +91,10 @@ public class KafkaKeyValueProducerPusher<K, V> implements Pusher<Pair<K, V>> {
@Override
public void close()
throws IOException {
+ //Call flush() before invoking close() to ensure any buffered messages are immediately sent. This is required
+ //since close() only guarantees delivery of in-flight messages.
+ log.info("Flushing records from producer buffer");
+ this.producer.flush();
this.closer.close();
}
diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
index 1ef1063..af574c5 100644
--- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
+++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
@@ -87,6 +87,10 @@ public class KafkaProducerPusher implements Pusher<byte[]> {
@Override
public void close()
throws IOException {
+ //Call flush() before invoking close() to ensure any buffered messages are immediately sent. This is required
+ //since close() only guarantees delivery of in-flight messages.
+ log.info("Flushing records from producer buffer");
+ this.producer.flush();
this.closer.close();
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index f0c2891..05603e5 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -18,8 +18,6 @@
package org.apache.gobblin.runtime;
import java.io.IOException;
-import java.net.URI;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -28,10 +26,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang.StringUtils;
-import org.apache.gobblin.util.HadoopUtils;
-import org.apache.gobblin.util.WriterUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -48,12 +42,11 @@ import com.google.common.eventbus.EventBus;
import com.google.common.io.Closer;
import com.typesafe.config.ConfigFactory;
-import org.apache.gobblin.source.Source;
-import org.apache.gobblin.source.WorkUnitStreamSource;
-import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
-import org.apache.gobblin.source.workunit.WorkUnitStream;
-import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import javax.annotation.Nullable;
+import lombok.RequiredArgsConstructor;
+
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.commit.CommitSequence;
import org.apache.gobblin.commit.CommitSequenceStore;
@@ -79,9 +72,13 @@ import org.apache.gobblin.runtime.locks.JobLockEventListener;
import org.apache.gobblin.runtime.locks.JobLockException;
import org.apache.gobblin.runtime.locks.LegacyJobLockFactoryManager;
import org.apache.gobblin.runtime.util.JobMetrics;
+import org.apache.gobblin.source.Source;
+import org.apache.gobblin.source.WorkUnitStreamSource;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ClusterNameTags;
import org.apache.gobblin.util.ExecutorsUtils;
@@ -90,9 +87,6 @@ import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
-import javax.annotation.Nullable;
-import lombok.RequiredArgsConstructor;
-
/**
* An abstract implementation of {@link JobLauncher} that handles common tasks for launching and running a job.
@@ -971,6 +965,7 @@ public abstract class AbstractJobLauncher implements JobLauncher {
} catch (Exception e) {
throw new JobException("Failed to execute all JobListeners", e);
} finally {
+ LOG.info("Submitting {}", timerEventName);
timer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext,
EventName.getEnumFromEventId(timerEventName)));
}