You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/07/08 19:34:00 UTC

nifi git commit: NIFI-2192: Fixed OOM issue in KafkaPublisher

Repository: nifi
Updated Branches:
  refs/heads/master 22f72c3d2 -> 7a901952b


NIFI-2192: Fixed OOM issue in KafkaPublisher

This closes #618.

Signed-off-by: Mark Payne <ma...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7a901952
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7a901952
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7a901952

Branch: refs/heads/master
Commit: 7a901952b5e9bb2ce711c6b85305fc1382354727
Parents: 22f72c3
Author: Oleg Zhurakousky <ol...@suitcase.io>
Authored: Fri Jul 8 08:22:24 2016 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jul 8 15:33:49 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/kafka/KafkaPublisher.java   | 31 +++++++--
 .../processors/kafka/pubsub/KafkaPublisher.java | 31 +++++++--
 .../kafka/pubsub/PublishKafkaTest.java          | 67 +++++++++++++++++---
 .../kafka/pubsub/StubPublishKafka.java          | 11 ++++
 4 files changed, 122 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7a901952/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
index bf89d22..561f36b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
@@ -56,6 +56,12 @@ class KafkaPublisher implements Closeable {
 
     private final Partitioner partitioner;
 
+    private final int ackCheckSize;
+
+    KafkaPublisher(Properties kafkaProperties) {
+        this(kafkaProperties, 100);
+    }
+
     /**
      * Creates an instance of this class as well as the instance of the
      * corresponding Kafka {@link KafkaProducer} using provided Kafka
@@ -65,10 +71,11 @@ class KafkaPublisher implements Closeable {
      *            instance of {@link Properties} used to bootstrap
      *            {@link KafkaProducer}
      */
-    KafkaPublisher(Properties kafkaProperties) {
+    KafkaPublisher(Properties kafkaProperties, int ackCheckSize) {
         kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
+        this.ackCheckSize = ackCheckSize;
         try {
             if (kafkaProperties.containsKey("partitioner.class")) {
                 this.partitioner = (Partitioner) Class.forName(kafkaProperties.getProperty("partitioner.class")).newInstance();
@@ -117,7 +124,9 @@ class KafkaPublisher implements Closeable {
 
         byte[] messageBytes;
         int tokenCounter = 0;
-        for (; (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) {
+        boolean continueSending = true;
+        KafkaPublisherResult result = null;
+        for (; continueSending && (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) {
             if (prevLastAckedMessageIndex < tokenCounter) {
                 Integer partitionId = publishingContext.getPartitionId();
                 if (partitionId == null && publishingContext.getKeyBytes() != null) {
@@ -126,11 +135,25 @@ class KafkaPublisher implements Closeable {
                 ProducerRecord<byte[], byte[]> message =
                     new ProducerRecord<>(publishingContext.getTopic(), publishingContext.getPartitionId(), publishingContext.getKeyBytes(), messageBytes);
                 resultFutures.add(this.kafkaProducer.send(message));
+
+                if (tokenCounter % this.ackCheckSize == 0) {
+                    int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
+                    resultFutures.clear();
+                    if (lastAckedMessageIndex % this.ackCheckSize != 0) {
+                        continueSending = false;
+                        result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
+                    }
+                    prevLastAckedMessageIndex = lastAckedMessageIndex;
+                }
             }
         }
 
-        int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
-        return new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
+        if (result == null) {
+            int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
+            resultFutures.clear();
+            result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
+        }
+        return result;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a901952/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
index 79dfce7..366efef 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
@@ -50,6 +50,12 @@ class KafkaPublisher implements Closeable {
 
     private volatile ComponentLog processLog;
 
+    private final int ackCheckSize;
+
+    KafkaPublisher(Properties kafkaProperties) {
+        this(kafkaProperties, 100);
+    }
+
     /**
      * Creates an instance of this class as well as the instance of the
      * corresponding Kafka {@link KafkaProducer} using provided Kafka
@@ -59,8 +65,9 @@ class KafkaPublisher implements Closeable {
      *            instance of {@link Properties} used to bootstrap
      *            {@link KafkaProducer}
      */
-    KafkaPublisher(Properties kafkaProperties) {
+    KafkaPublisher(Properties kafkaProperties, int ackCheckSize) {
         this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
+        this.ackCheckSize = ackCheckSize;
     }
 
     /**
@@ -100,15 +107,31 @@ class KafkaPublisher implements Closeable {
 
         byte[] messageBytes;
         int tokenCounter = 0;
-        for (; (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) {
+        boolean continueSending = true;
+        KafkaPublisherResult result = null;
+        for (; continueSending && (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) {
             if (prevLastAckedMessageIndex < tokenCounter) {
                 ProducerRecord<byte[], byte[]> message = new ProducerRecord<>(publishingContext.getTopic(), publishingContext.getKeyBytes(), messageBytes);
                 resultFutures.add(this.kafkaProducer.send(message));
+
+                if (tokenCounter % this.ackCheckSize == 0){
+                    int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
+                    resultFutures.clear();
+                    if (lastAckedMessageIndex % this.ackCheckSize != 0) {
+                        continueSending = false;
+                        result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
+                    }
+                    prevLastAckedMessageIndex = lastAckedMessageIndex;
+                }
             }
         }
 
-        int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
-        return new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
+        if (result == null) {
+            int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
+            resultFutures.clear();
+            result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
+        }
+        return result;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a901952/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
index e669df7..af550b4 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
@@ -81,7 +81,7 @@ public class PublishKafkaTest {
     @Test
     public void validateSingleCharacterDemarcatedMessages() {
         String topicName = "validateSingleCharacterDemarcatedMessages";
-        StubPublishKafka putKafka = new StubPublishKafka();
+        StubPublishKafka putKafka = new StubPublishKafka(100);
         TestRunner runner = TestRunners.newTestRunner(putKafka);
         runner.setProperty(PublishKafka.TOPIC, topicName);
         runner.setProperty(PublishKafka.CLIENT_ID, "foo");
@@ -99,9 +99,9 @@ public class PublishKafkaTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void validateMultiCharacterDemarcatedMessagesAndCustomPartitioner() {
+    public void validateMultiCharacterDemarcatedMessagesAndCustomPartitionerA() {
         String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner";
-        StubPublishKafka putKafka = new StubPublishKafka();
+        StubPublishKafka putKafka = new StubPublishKafka(100);
         TestRunner runner = TestRunners.newTestRunner(putKafka);
         runner.setProperty(PublishKafka.TOPIC, topicName);
         runner.setProperty(PublishKafka.CLIENT_ID, "foo");
@@ -121,9 +121,56 @@ public class PublishKafkaTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void validateOnSendFailureAndThenResendSuccess() throws Exception {
+    public void validateMultiCharacterDemarcatedMessagesAndCustomPartitionerB() {
+        String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner";
+        StubPublishKafka putKafka = new StubPublishKafka(1);
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka.TOPIC, topicName);
+        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
+        runner.setProperty(PublishKafka.KEY, "key1");
+        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka.PARTITION_CLASS, Partitioners.RoundRobinPartitioner.class.getName());
+        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "foo");
+
+        runner.enqueue("Hello WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8));
+        runner.run(1, false);
+        assertEquals(0, runner.getQueueSize().getObjectCount());
+        Producer<byte[], byte[]> producer = putKafka.getProducer();
+        verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
+
+        runner.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void validateOnSendFailureAndThenResendSuccessA() throws Exception {
+        String topicName = "validateSendFailureAndThenResendSuccess";
+        StubPublishKafka putKafka = new StubPublishKafka(100);
+
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka.TOPIC, topicName);
+        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
+        runner.setProperty(PublishKafka.KEY, "key1");
+        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
+        runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis");
+
+        final String text = "Hello World\nGoodbye\nfail\n2";
+        runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
+        runner.run(1, false);
+        assertEquals(1, runner.getQueueSize().getObjectCount()); // due to failure
+        runner.run(1, false);
+        assertEquals(0, runner.getQueueSize().getObjectCount());
+        Producer<byte[], byte[]> producer = putKafka.getProducer();
+        verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
+        runner.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void validateOnSendFailureAndThenResendSuccessB() throws Exception {
         String topicName = "validateSendFailureAndThenResendSuccess";
-        StubPublishKafka putKafka = new StubPublishKafka();
+        StubPublishKafka putKafka = new StubPublishKafka(1);
 
         TestRunner runner = TestRunners.newTestRunner(putKafka);
         runner.setProperty(PublishKafka.TOPIC, topicName);
@@ -148,7 +195,7 @@ public class PublishKafkaTest {
     @Test
     public void validateOnFutureGetFailureAndThenResendSuccess() throws Exception {
         String topicName = "validateSendFailureAndThenResendSuccess";
-        StubPublishKafka putKafka = new StubPublishKafka();
+        StubPublishKafka putKafka = new StubPublishKafka(100);
 
         TestRunner runner = TestRunners.newTestRunner(putKafka);
         runner.setProperty(PublishKafka.TOPIC, topicName);
@@ -177,7 +224,7 @@ public class PublishKafkaTest {
     @Test
     public void validateDemarcationIntoEmptyMessages() {
         String topicName = "validateDemarcationIntoEmptyMessages";
-        StubPublishKafka putKafka = new StubPublishKafka();
+        StubPublishKafka putKafka = new StubPublishKafka(100);
         final TestRunner runner = TestRunners.newTestRunner(putKafka);
         runner.setProperty(PublishKafka.TOPIC, topicName);
         runner.setProperty(PublishKafka.KEY, "key1");
@@ -197,7 +244,7 @@ public class PublishKafkaTest {
     @Test
     public void validateComplexRightPartialDemarcatedMessages() {
         String topicName = "validateComplexRightPartialDemarcatedMessages";
-        StubPublishKafka putKafka = new StubPublishKafka();
+        StubPublishKafka putKafka = new StubPublishKafka(100);
         TestRunner runner = TestRunners.newTestRunner(putKafka);
         runner.setProperty(PublishKafka.TOPIC, topicName);
         runner.setProperty(PublishKafka.CLIENT_ID, "foo");
@@ -216,7 +263,7 @@ public class PublishKafkaTest {
     @Test
     public void validateComplexLeftPartialDemarcatedMessages() {
         String topicName = "validateComplexLeftPartialDemarcatedMessages";
-        StubPublishKafka putKafka = new StubPublishKafka();
+        StubPublishKafka putKafka = new StubPublishKafka(100);
         TestRunner runner = TestRunners.newTestRunner(putKafka);
         runner.setProperty(PublishKafka.TOPIC, topicName);
         runner.setProperty(PublishKafka.CLIENT_ID, "foo");
@@ -236,7 +283,7 @@ public class PublishKafkaTest {
     @Test
     public void validateComplexPartialMatchDemarcatedMessages() {
         String topicName = "validateComplexPartialMatchDemarcatedMessages";
-        StubPublishKafka putKafka = new StubPublishKafka();
+        StubPublishKafka putKafka = new StubPublishKafka(100);
         TestRunner runner = TestRunners.newTestRunner(putKafka);
         runner.setProperty(PublishKafka.TOPIC, topicName);
         runner.setProperty(PublishKafka.CLIENT_ID, "foo");

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a901952/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
index 8e41173..2236f30 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
@@ -43,6 +43,12 @@ public class StubPublishKafka extends PublishKafka {
 
     private volatile boolean failed;
 
+    private final int ackCheckSize;
+
+    StubPublishKafka(int ackCheckSize) {
+        this.ackCheckSize = ackCheckSize;
+    }
+
     public Producer<byte[], byte[]> getProducer() {
         return producer;
     }
@@ -65,7 +71,12 @@ public class StubPublishKafka extends PublishKafka {
             Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer");
             kf.setAccessible(true);
             kf.set(publisher, producer);
+
+            Field ackCheckSizeField = KafkaPublisher.class.getDeclaredField("ackCheckSize");
+            ackCheckSizeField.setAccessible(true);
+            ackCheckSizeField.set(publisher, this.ackCheckSize);
         } catch (Exception e) {
+            e.printStackTrace();
             throw new IllegalStateException(e);
         }
         return publisher;