You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/02/20 14:56:10 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-684] Ensure buffered messages are flushed before close() in K…

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ebbc153  [GOBBLIN-684] Ensure buffered messages are flushed before close() in K…
ebbc153 is described below

commit ebbc153463e01157a562d75a4e5d11b2428715df
Author: suvasude <su...@linkedin.biz>
AuthorDate: Wed Feb 20 06:56:07 2019 -0800

    [GOBBLIN-684] Ensure buffered messages are flushed before close() in K…
    
    Closes #2556 from sv2000/kafkaFlush
---
 .../metrics/kafka/KafkaKeyValueProducerPusher.java | 54 +++++++++++++++++++++-
 .../gobblin/metrics/kafka/KafkaProducerPusher.java | 52 ++++++++++++++++++++-
 .../metrics/kafka/KafkaKeyValueProducerPusher.java |  8 +++-
 .../gobblin/metrics/kafka/KafkaProducerPusher.java |  4 ++
 .../gobblin/runtime/AbstractJobLauncher.java       | 23 ++++-----
 5 files changed, 121 insertions(+), 20 deletions(-)

diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
index e9ea3ab..c269da7 100644
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
+++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
@@ -20,11 +20,15 @@ package org.apache.gobblin.metrics.kafka;
 import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
@@ -44,9 +48,21 @@ import org.apache.gobblin.util.ConfigUtils;
  */
 @Slf4j
 public class KafkaKeyValueProducerPusher<K, V> implements Pusher<Pair<K, V>> {
+  private static final long DEFAULT_MAX_NUM_FUTURES_TO_BUFFER = 1000L;
+  //Low watermark for the size of the futures queue, to trigger flushing of messages.
+  private static final String MAX_NUM_FUTURES_TO_BUFFER_KEY = "numFuturesToBuffer";
+
   private final String topic;
   private final KafkaProducer<K, V> producer;
   private final Closer closer;
+  //Queue to keep track of the futures returned by the Kafka asynchronous send() call. The futures queue is used
+  // to mimic the functionality of flush() call (available in Kafka 09 and later). Currently, there are no
+  // capacity limits on the size of the futures queue. In general, if queue capacity is enforced, a safe lower bound for queue
+  // capacity is MAX_NUM_FUTURES_TO_BUFFER + (numThreads * maxNumMessagesPerInterval), where numThreads equals the number of
+  // threads sharing the producer instance and maxNumMessagesPerInterval is the estimated maximum number of messages
+  // emitted by a thread per reporting interval.
+  private final Queue<Future<RecordMetadata>> futures = new LinkedBlockingDeque<>();
+  private long numFuturesToBuffer=1000L;
 
   public KafkaKeyValueProducerPusher(String brokers, String topic, Optional<Config> kafkaConfig) {
     this.closer = Closer.create();
@@ -61,10 +77,12 @@ public class KafkaKeyValueProducerPusher<K, V> implements Pusher<Pair<K, V>> {
     props.put(ProducerConfig.RETRIES_CONFIG, 3);
     //To guarantee ordered delivery, the maximum in flight requests must be set to 1.
     props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
+    props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, true);
 
     // add the kafka scoped config. if any of the above are specified then they are overridden
     if (kafkaConfig.isPresent()) {
       props.putAll(ConfigUtils.configToProperties(kafkaConfig.get()));
+      this.numFuturesToBuffer = ConfigUtils.getLong(kafkaConfig.get(), MAX_NUM_FUTURES_TO_BUFFER_KEY, DEFAULT_MAX_NUM_FUTURES_TO_BUFFER);
     }
 
     this.producer = createProducer(props);
@@ -80,17 +98,49 @@ public class KafkaKeyValueProducerPusher<K, V> implements Pusher<Pair<K, V>> {
    */
   public void pushMessages(List<Pair<K, V>> messages) {
     for (Pair<K, V> message: messages) {
-      this.producer.send(new ProducerRecord<>(topic, message.getKey(), message.getValue()), (recordMetadata, e) -> {
+      this.futures.offer(this.producer.send(new ProducerRecord<>(topic, message.getKey(), message.getValue()), (recordMetadata, e) -> {
         if (e != null) {
           log.error("Failed to send message to topic {} due to exception: ", topic, e);
         }
-      });
+      }));
+    }
+
+    //Once the low watermark of numFuturesToBuffer is hit, start flushing messages from the futures
+    // buffer. In order to avoid blocking on newest messages added to futures queue, we only invoke future.get() on
+    // the oldest messages in the futures buffer. The number of messages to flush is same as the number of messages added
+    // in the current call. Note this does not completely avoid calling future.get() on the newer messages e.g. when
+    // multiple threads enter the if{} block concurrently, and invoke flush().
+    if (this.futures.size() >= this.numFuturesToBuffer) {
+      flush(messages.size());
+    }
+  }
+
+  /**
+   * Flush any records that may be present in the producer buffer upto a maximum of <code>numRecordsToFlush</code>.
+   * This method is needed since Kafka 0.8 producer does not have a flush() API. In the absence of the flush()
+   * implementation, records which are present in the buffer but not in-flight may not be delivered at all when close()
+   * is called, leading to data loss.
+   * @param numRecordsToFlush
+   */
+  private void flush(long numRecordsToFlush) {
+    log.debug("Flushing records from producer buffer");
+    Future future;
+    long numRecordsFlushed = 0L;
+    while (((future = futures.poll()) != null) && (numRecordsFlushed++ < numRecordsToFlush)) {
+      try {
+        future.get();
+      } catch (Exception e) {
+        log.error("Exception encountered when flushing record", e);
+      }
     }
+    log.debug("Flushed {} records from producer buffer", numRecordsFlushed);
   }
 
   @Override
   public void close()
       throws IOException {
+    log.info("Flushing records before close");
+    flush(Long.MAX_VALUE);
     this.closer.close();
   }
 
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
index 2b98419..2772e4a 100644
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
+++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
@@ -20,10 +20,14 @@ package org.apache.gobblin.metrics.kafka;
 import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
@@ -41,10 +45,21 @@ import org.apache.gobblin.util.ConfigUtils;
  */
 @Slf4j
 public class KafkaProducerPusher implements Pusher<byte[]> {
+  private static final long DEFAULT_MAX_NUM_FUTURES_TO_BUFFER = 1000L;
+  //Low watermark for the size of the futures queue, to trigger flushing of messages.
+  private static final String MAX_NUM_FUTURES_TO_BUFFER_KEY = "numFuturesToBuffer";
 
   private final String topic;
   private final KafkaProducer<String, byte[]> producer;
   private final Closer closer;
+  //Queue to keep track of the futures returned by the Kafka asynchronous send() call. The futures queue is used
+  // to mimic the functionality of flush() call (available in Kafka 09 and later). Currently, there are no
+  // capacity limits on the size of the futures queue. In general, if queue capacity is enforced, a safe lower bound for queue
+  // capacity is MAX_NUM_FUTURES_TO_BUFFER + (numThreads * maxNumMessagesPerInterval), where numThreads equals the number of
+  // threads sharing the producer instance and maxNumMessagesPerInterval is the estimated maximum number of messages
+  // emitted by a thread per reporting interval.
+  private final Queue<Future<RecordMetadata>> futures = new LinkedBlockingDeque<>();
+  private long numFuturesToBuffer = 1000L;
 
   public KafkaProducerPusher(String brokers, String topic, Optional<Config> kafkaConfig) {
     this.closer = Closer.create();
@@ -61,6 +76,7 @@ public class KafkaProducerPusher implements Pusher<byte[]> {
     // add the kafka scoped config. if any of the above are specified then they are overridden
     if (kafkaConfig.isPresent()) {
       props.putAll(ConfigUtils.configToProperties(kafkaConfig.get()));
+      this.numFuturesToBuffer = ConfigUtils.getLong(kafkaConfig.get(), MAX_NUM_FUTURES_TO_BUFFER_KEY, DEFAULT_MAX_NUM_FUTURES_TO_BUFFER);
     }
 
     this.producer = createProducer(props);
@@ -76,17 +92,49 @@ public class KafkaProducerPusher implements Pusher<byte[]> {
    */
   public void pushMessages(List<byte[]> messages) {
     for (byte[] message: messages) {
-      producer.send(new ProducerRecord<>(topic, message), (recordMetadata, e) -> {
+      this.futures.offer(producer.send(new ProducerRecord<>(topic, message), (recordMetadata, e) -> {
         if (e != null) {
           log.error("Failed to send message to topic {} due to exception: ", topic, e);
         }
-      });
+      }));
     }
+
+    //Once the low watermark of numFuturesToBuffer is hit, start flushing messages from the futures
+    // buffer. In order to avoid blocking on newest messages added to futures queue, we only invoke future.get() on
+    // the oldest messages in the futures buffer. The number of messages to flush is same as the number of messages added
+    // in the current call. Note this does not completely avoid calling future.get() on the newer messages e.g. when
+    // multiple threads enter the if{} block concurrently, and invoke flush().
+    if (this.futures.size() >= this.numFuturesToBuffer) {
+      flush(messages.size());
+    }
+  }
+
+  /**
+   * Flush any records that may be present in the producer buffer upto a maximum of <code>numRecordsToFlush</code>.
+   * This method is needed since Kafka 0.8 producer does not have a flush() API. In the absence of the flush()
+   * implementation, records which are present in the buffer but not in-flight may not be delivered at all when close()
+   * is called, leading to data loss.
+   * @param numRecordsToFlush
+   */
+  private void flush(long numRecordsToFlush) {
+    log.debug("Flushing records from producer buffer");
+    Future future;
+    long numRecordsFlushed = 0L;
+    while (((future = futures.poll()) != null) && (numRecordsFlushed++ < numRecordsToFlush)) {
+      try {
+        future.get();
+      } catch (Exception e) {
+        log.error("Exception encountered when flushing record", e);
+      }
+    }
+    log.debug("Flushed {} records from producer buffer", numRecordsFlushed);
   }
 
   @Override
   public void close()
       throws IOException {
+    log.info("Flushing records before close");
+    flush(Long.MAX_VALUE);
     this.closer.close();
   }
 
diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
index e4ad6ca..b411b94 100644
--- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
+++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
@@ -32,10 +32,10 @@ import com.google.common.base.Optional;
 import com.google.common.io.Closer;
 import com.typesafe.config.Config;
 
-import org.apache.gobblin.util.ConfigUtils;
-
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.util.ConfigUtils;
+
 
 /**
  * Establishes a connection to a Kafka cluster and push keyed messages to a specified topic.
@@ -91,6 +91,10 @@ public class KafkaKeyValueProducerPusher<K, V> implements Pusher<Pair<K, V>> {
   @Override
   public void close()
       throws IOException {
+    //Call flush() before invoking close() to ensure any buffered messages are immediately sent. This is required
+    //since close() only guarantees delivery of in-flight messages.
+    log.info("Flushing records from producer buffer");
+    this.producer.flush();
     this.closer.close();
   }
 
diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
index 1ef1063..af574c5 100644
--- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
+++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
@@ -87,6 +87,10 @@ public class KafkaProducerPusher implements Pusher<byte[]> {
   @Override
   public void close()
       throws IOException {
+    //Call flush() before invoking close() to ensure any buffered messages are immediately sent. This is required
+    //since close() only guarantees delivery of in-flight messages.
+    log.info("Flushing records from producer buffer");
+    this.producer.flush();
     this.closer.close();
   }
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index f0c2891..05603e5 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -18,8 +18,6 @@
 package org.apache.gobblin.runtime;
 
 import java.io.IOException;
-import java.net.URI;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -28,10 +26,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.gobblin.util.HadoopUtils;
-import org.apache.gobblin.util.WriterUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -48,12 +42,11 @@ import com.google.common.eventbus.EventBus;
 import com.google.common.io.Closer;
 import com.typesafe.config.ConfigFactory;
 
-import org.apache.gobblin.source.Source;
-import org.apache.gobblin.source.WorkUnitStreamSource;
-import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
-import org.apache.gobblin.source.workunit.WorkUnitStream;
-import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import javax.annotation.Nullable;
+import lombok.RequiredArgsConstructor;
+
 import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.commit.CommitSequence;
 import org.apache.gobblin.commit.CommitSequenceStore;
@@ -79,9 +72,13 @@ import org.apache.gobblin.runtime.locks.JobLockEventListener;
 import org.apache.gobblin.runtime.locks.JobLockException;
 import org.apache.gobblin.runtime.locks.LegacyJobLockFactoryManager;
 import org.apache.gobblin.runtime.util.JobMetrics;
+import org.apache.gobblin.source.Source;
+import org.apache.gobblin.source.WorkUnitStreamSource;
 import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitStream;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ClusterNameTags;
 import org.apache.gobblin.util.ExecutorsUtils;
@@ -90,9 +87,6 @@ import org.apache.gobblin.util.JobLauncherUtils;
 import org.apache.gobblin.util.ParallelRunner;
 import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
 
-import javax.annotation.Nullable;
-import lombok.RequiredArgsConstructor;
-
 
 /**
  * An abstract implementation of {@link JobLauncher} that handles common tasks for launching and running a job.
@@ -971,6 +965,7 @@ public abstract class AbstractJobLauncher implements JobLauncher {
     } catch (Exception e) {
       throw new JobException("Failed to execute all JobListeners", e);
     } finally {
+      LOG.info("Submitting {}", timerEventName);
       timer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext,
           EventName.getEnumFromEventId(timerEventName)));
     }