You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/07/08 19:34:00 UTC
nifi git commit: NIFI-2192: Fixed OOM issue in KafkaPublisher
Repository: nifi
Updated Branches:
refs/heads/master 22f72c3d2 -> 7a901952b
NIFI-2192: Fixed OOM issue in KafkaPublisher
This closes #618.
Signed-off-by: Mark Payne <ma...@hotmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7a901952
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7a901952
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7a901952
Branch: refs/heads/master
Commit: 7a901952b5e9bb2ce711c6b85305fc1382354727
Parents: 22f72c3
Author: Oleg Zhurakousky <ol...@suitcase.io>
Authored: Fri Jul 8 08:22:24 2016 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jul 8 15:33:49 2016 -0400
----------------------------------------------------------------------
.../nifi/processors/kafka/KafkaPublisher.java | 31 +++++++--
.../processors/kafka/pubsub/KafkaPublisher.java | 31 +++++++--
.../kafka/pubsub/PublishKafkaTest.java | 67 +++++++++++++++++---
.../kafka/pubsub/StubPublishKafka.java | 11 ++++
4 files changed, 122 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a901952/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
index bf89d22..561f36b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
@@ -56,6 +56,12 @@ class KafkaPublisher implements Closeable {
private final Partitioner partitioner;
+ private final int ackCheckSize;
+
+ KafkaPublisher(Properties kafkaProperties) {
+ this(kafkaProperties, 100);
+ }
+
/**
* Creates an instance of this class as well as the instance of the
* corresponding Kafka {@link KafkaProducer} using provided Kafka
@@ -65,10 +71,11 @@ class KafkaPublisher implements Closeable {
* instance of {@link Properties} used to bootstrap
* {@link KafkaProducer}
*/
- KafkaPublisher(Properties kafkaProperties) {
+ KafkaPublisher(Properties kafkaProperties, int ackCheckSize) {
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
+ this.ackCheckSize = ackCheckSize;
try {
if (kafkaProperties.containsKey("partitioner.class")) {
this.partitioner = (Partitioner) Class.forName(kafkaProperties.getProperty("partitioner.class")).newInstance();
@@ -117,7 +124,9 @@ class KafkaPublisher implements Closeable {
byte[] messageBytes;
int tokenCounter = 0;
- for (; (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) {
+ boolean continueSending = true;
+ KafkaPublisherResult result = null;
+ for (; continueSending && (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) {
if (prevLastAckedMessageIndex < tokenCounter) {
Integer partitionId = publishingContext.getPartitionId();
if (partitionId == null && publishingContext.getKeyBytes() != null) {
@@ -126,11 +135,25 @@ class KafkaPublisher implements Closeable {
ProducerRecord<byte[], byte[]> message =
new ProducerRecord<>(publishingContext.getTopic(), publishingContext.getPartitionId(), publishingContext.getKeyBytes(), messageBytes);
resultFutures.add(this.kafkaProducer.send(message));
+
+ if (tokenCounter % this.ackCheckSize == 0) {
+ int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
+ resultFutures.clear();
+ if (lastAckedMessageIndex % this.ackCheckSize != 0) {
+ continueSending = false;
+ result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
+ }
+ prevLastAckedMessageIndex = lastAckedMessageIndex;
+ }
}
}
- int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
- return new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
+ if (result == null) {
+ int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
+ resultFutures.clear();
+ result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
+ }
+ return result;
}
/**
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a901952/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
index 79dfce7..366efef 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
@@ -50,6 +50,12 @@ class KafkaPublisher implements Closeable {
private volatile ComponentLog processLog;
+ private final int ackCheckSize;
+
+ KafkaPublisher(Properties kafkaProperties) {
+ this(kafkaProperties, 100);
+ }
+
/**
* Creates an instance of this class as well as the instance of the
* corresponding Kafka {@link KafkaProducer} using provided Kafka
@@ -59,8 +65,9 @@ class KafkaPublisher implements Closeable {
* instance of {@link Properties} used to bootstrap
* {@link KafkaProducer}
*/
- KafkaPublisher(Properties kafkaProperties) {
+ KafkaPublisher(Properties kafkaProperties, int ackCheckSize) {
this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
+ this.ackCheckSize = ackCheckSize;
}
/**
@@ -100,15 +107,31 @@ class KafkaPublisher implements Closeable {
byte[] messageBytes;
int tokenCounter = 0;
- for (; (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) {
+ boolean continueSending = true;
+ KafkaPublisherResult result = null;
+ for (; continueSending && (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) {
if (prevLastAckedMessageIndex < tokenCounter) {
ProducerRecord<byte[], byte[]> message = new ProducerRecord<>(publishingContext.getTopic(), publishingContext.getKeyBytes(), messageBytes);
resultFutures.add(this.kafkaProducer.send(message));
+
+ if (tokenCounter % this.ackCheckSize == 0){
+ int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
+ resultFutures.clear();
+ if (lastAckedMessageIndex % this.ackCheckSize != 0) {
+ continueSending = false;
+ result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
+ }
+ prevLastAckedMessageIndex = lastAckedMessageIndex;
+ }
}
}
- int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
- return new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
+ if (result == null) {
+ int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
+ resultFutures.clear();
+ result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
+ }
+ return result;
}
/**
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a901952/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
index e669df7..af550b4 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
@@ -81,7 +81,7 @@ public class PublishKafkaTest {
@Test
public void validateSingleCharacterDemarcatedMessages() {
String topicName = "validateSingleCharacterDemarcatedMessages";
- StubPublishKafka putKafka = new StubPublishKafka();
+ StubPublishKafka putKafka = new StubPublishKafka(100);
TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka.TOPIC, topicName);
runner.setProperty(PublishKafka.CLIENT_ID, "foo");
@@ -99,9 +99,9 @@ public class PublishKafkaTest {
@SuppressWarnings("unchecked")
@Test
- public void validateMultiCharacterDemarcatedMessagesAndCustomPartitioner() {
+ public void validateMultiCharacterDemarcatedMessagesAndCustomPartitionerA() {
String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner";
- StubPublishKafka putKafka = new StubPublishKafka();
+ StubPublishKafka putKafka = new StubPublishKafka(100);
TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka.TOPIC, topicName);
runner.setProperty(PublishKafka.CLIENT_ID, "foo");
@@ -121,9 +121,56 @@ public class PublishKafkaTest {
@SuppressWarnings("unchecked")
@Test
- public void validateOnSendFailureAndThenResendSuccess() throws Exception {
+ public void validateMultiCharacterDemarcatedMessagesAndCustomPartitionerB() {
+ String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner";
+ StubPublishKafka putKafka = new StubPublishKafka(1);
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka.TOPIC, topicName);
+ runner.setProperty(PublishKafka.CLIENT_ID, "foo");
+ runner.setProperty(PublishKafka.KEY, "key1");
+ runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka.PARTITION_CLASS, Partitioners.RoundRobinPartitioner.class.getName());
+ runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "foo");
+
+ runner.enqueue("Hello WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8));
+ runner.run(1, false);
+ assertEquals(0, runner.getQueueSize().getObjectCount());
+ Producer<byte[], byte[]> producer = putKafka.getProducer();
+ verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
+
+ runner.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateOnSendFailureAndThenResendSuccessA() throws Exception {
+ String topicName = "validateSendFailureAndThenResendSuccess";
+ StubPublishKafka putKafka = new StubPublishKafka(100);
+
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka.TOPIC, topicName);
+ runner.setProperty(PublishKafka.CLIENT_ID, "foo");
+ runner.setProperty(PublishKafka.KEY, "key1");
+ runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
+ runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis");
+
+ final String text = "Hello World\nGoodbye\nfail\n2";
+ runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
+ runner.run(1, false);
+ assertEquals(1, runner.getQueueSize().getObjectCount()); // due to failure
+ runner.run(1, false);
+ assertEquals(0, runner.getQueueSize().getObjectCount());
+ Producer<byte[], byte[]> producer = putKafka.getProducer();
+ verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
+ runner.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateOnSendFailureAndThenResendSuccessB() throws Exception {
String topicName = "validateSendFailureAndThenResendSuccess";
- StubPublishKafka putKafka = new StubPublishKafka();
+ StubPublishKafka putKafka = new StubPublishKafka(1);
TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka.TOPIC, topicName);
@@ -148,7 +195,7 @@ public class PublishKafkaTest {
@Test
public void validateOnFutureGetFailureAndThenResendSuccess() throws Exception {
String topicName = "validateSendFailureAndThenResendSuccess";
- StubPublishKafka putKafka = new StubPublishKafka();
+ StubPublishKafka putKafka = new StubPublishKafka(100);
TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka.TOPIC, topicName);
@@ -177,7 +224,7 @@ public class PublishKafkaTest {
@Test
public void validateDemarcationIntoEmptyMessages() {
String topicName = "validateDemarcationIntoEmptyMessages";
- StubPublishKafka putKafka = new StubPublishKafka();
+ StubPublishKafka putKafka = new StubPublishKafka(100);
final TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka.TOPIC, topicName);
runner.setProperty(PublishKafka.KEY, "key1");
@@ -197,7 +244,7 @@ public class PublishKafkaTest {
@Test
public void validateComplexRightPartialDemarcatedMessages() {
String topicName = "validateComplexRightPartialDemarcatedMessages";
- StubPublishKafka putKafka = new StubPublishKafka();
+ StubPublishKafka putKafka = new StubPublishKafka(100);
TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka.TOPIC, topicName);
runner.setProperty(PublishKafka.CLIENT_ID, "foo");
@@ -216,7 +263,7 @@ public class PublishKafkaTest {
@Test
public void validateComplexLeftPartialDemarcatedMessages() {
String topicName = "validateComplexLeftPartialDemarcatedMessages";
- StubPublishKafka putKafka = new StubPublishKafka();
+ StubPublishKafka putKafka = new StubPublishKafka(100);
TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka.TOPIC, topicName);
runner.setProperty(PublishKafka.CLIENT_ID, "foo");
@@ -236,7 +283,7 @@ public class PublishKafkaTest {
@Test
public void validateComplexPartialMatchDemarcatedMessages() {
String topicName = "validateComplexPartialMatchDemarcatedMessages";
- StubPublishKafka putKafka = new StubPublishKafka();
+ StubPublishKafka putKafka = new StubPublishKafka(100);
TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka.TOPIC, topicName);
runner.setProperty(PublishKafka.CLIENT_ID, "foo");
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a901952/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
index 8e41173..2236f30 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
@@ -43,6 +43,12 @@ public class StubPublishKafka extends PublishKafka {
private volatile boolean failed;
+ private final int ackCheckSize;
+
+ StubPublishKafka(int ackCheckSize) {
+ this.ackCheckSize = ackCheckSize;
+ }
+
public Producer<byte[], byte[]> getProducer() {
return producer;
}
@@ -65,7 +71,12 @@ public class StubPublishKafka extends PublishKafka {
Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer");
kf.setAccessible(true);
kf.set(publisher, producer);
+
+ Field ackCheckSizeField = KafkaPublisher.class.getDeclaredField("ackCheckSize");
+ ackCheckSizeField.setAccessible(true);
+ ackCheckSizeField.set(publisher, this.ackCheckSize);
} catch (Exception e) {
+ e.printStackTrace();
throw new IllegalStateException(e);
}
return publisher;