You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/12/11 18:30:06 UTC

incubator-gobblin git commit: [GOBBLIN-651][GOBBLIN-650] Ensure ordered delivery of Kafka events from KeyValueProducerPusher for kafka-08.[]

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 68f6a1611 -> dc96e3e78


[GOBBLIN-651][GOBBLIN-650] Ensure ordered delivery of Kafka events from KeyValueProducerPusher for kafka-08.[]

Closes #2520 from sv2000/kafkaOrderedRedux


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/dc96e3e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/dc96e3e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/dc96e3e7

Branch: refs/heads/master
Commit: dc96e3e780358550d997e689bc695cbe19f79996
Parents: 68f6a16
Author: suvasude <su...@linkedin.biz>
Authored: Tue Dec 11 10:30:01 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Dec 11 10:30:01 2018 -0800

----------------------------------------------------------------------
 .../apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java  | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/dc96e3e7/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
index ec930fc..e9ea3ab 100644
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
+++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
@@ -59,6 +59,8 @@ public class KafkaKeyValueProducerPusher<K, V> implements Pusher<Pair<K, V>> {
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
     props.put(ProducerConfig.ACKS_CONFIG, "all");
     props.put(ProducerConfig.RETRIES_CONFIG, 3);
+    //To guarantee ordered delivery, the maximum in flight requests must be set to 1.
+    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
 
     // add the kafka scoped config. if any of the above are specified then they are overridden
     if (kafkaConfig.isPresent()) {