You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2016/10/06 20:12:00 UTC
[5/6] nifi git commit: NIFI-2865: Refactored PublishKafka and
PublishKafka_0_10 to allow batching of FlowFiles within a single publish and
to let messages timeout if not acknowledged
NIFI-2865: Refactored PublishKafka and PublishKafka_0_10 to allow batching of FlowFiles within a single publish and to let messages timeout if not acknowledged
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b9cb6b1b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b9cb6b1b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b9cb6b1b
Branch: refs/heads/master
Commit: b9cb6b1b475eb4688b7cd32f6d343c5dffb20567
Parents: a4ed622
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Sep 8 19:37:35 2016 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Thu Oct 6 15:51:16 2016 -0400
----------------------------------------------------------------------
.../io/exception/TokenTooLargeException.java | 26 +
.../nifi/stream/io/util/StreamDemarcator.java | 38 +-
.../stream/io/util/StreamDemarcatorTest.java | 84 ++-
.../nifi/util/StandardProcessorTestRunner.java | 40 +-
.../java/org/apache/nifi/util/TestRunner.java | 16 +-
.../kafka/pubsub/ConsumeKafka_0_10.java | 2 +-
.../processors/kafka/pubsub/ConsumerPool.java | 4 +-
.../kafka/pubsub/InFlightMessageTracker.java | 178 +++++
.../kafka/pubsub/KafkaProcessorUtils.java | 33 +-
.../processors/kafka/pubsub/KafkaPublisher.java | 236 -------
.../kafka/pubsub/PublishKafka_0_10.java | 652 +++++++------------
.../processors/kafka/pubsub/PublishResult.java | 56 ++
.../processors/kafka/pubsub/PublisherLease.java | 132 ++++
.../processors/kafka/pubsub/PublisherPool.java | 98 +++
.../kafka/pubsub/PublishingContext.java | 124 ----
.../kafka/pubsub/KafkaPublisherTest.java | 306 ---------
.../kafka/pubsub/PublishKafkaTest.java | 375 -----------
.../kafka/pubsub/PublishingContextTest.java | 91 ---
.../kafka/pubsub/StubPublishKafka.java | 143 ----
.../pubsub/TestInFlightMessageTracker.java | 87 +++
.../kafka/pubsub/TestPublishKafka.java | 262 ++++++++
.../kafka/pubsub/TestPublisherLease.java | 194 ++++++
.../kafka/pubsub/TestPublisherPool.java | 68 ++
.../kafka/test/EmbeddedKafkaProducerHelper.java | 110 ----
.../nifi/processors/kafka/KafkaPublisher.java | 4 +-
.../processors/kafka/pubsub/ConsumeKafka.java | 3 +-
.../processors/kafka/pubsub/ConsumerPool.java | 5 +-
.../kafka/pubsub/InFlightMessageTracker.java | 178 +++++
.../kafka/pubsub/KafkaProcessorUtils.java | 33 +-
.../processors/kafka/pubsub/KafkaPublisher.java | 236 -------
.../processors/kafka/pubsub/PublishKafka.java | 639 +++++++-----------
.../processors/kafka/pubsub/PublishResult.java | 56 ++
.../processors/kafka/pubsub/PublisherLease.java | 132 ++++
.../processors/kafka/pubsub/PublisherPool.java | 98 +++
.../kafka/pubsub/PublishingContext.java | 124 ----
.../org.apache.nifi.processor.Processor | 2 +-
.../kafka/pubsub/KafkaPublisherTest.java | 306 ---------
.../kafka/pubsub/PublishKafkaTest.java | 375 -----------
.../kafka/pubsub/PublishingContextTest.java | 91 ---
.../kafka/pubsub/StubPublishKafka.java | 144 ----
.../pubsub/TestInFlightMessageTracker.java | 87 +++
.../kafka/pubsub/TestPublishKafka.java | 262 ++++++++
.../kafka/pubsub/TestPublisherLease.java | 194 ++++++
.../kafka/pubsub/TestPublisherPool.java | 68 ++
.../kafka/test/EmbeddedKafkaProducerHelper.java | 110 ----
45 files changed, 2837 insertions(+), 3665 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/b9cb6b1b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/TokenTooLargeException.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/TokenTooLargeException.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/TokenTooLargeException.java
new file mode 100644
index 0000000..7024f34
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/TokenTooLargeException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stream.io.exception;
+
+import java.io.IOException;
+
+public class TokenTooLargeException extends IOException {
+ public TokenTooLargeException(final String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b9cb6b1b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java
index 3064f1c..dc3d829 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java
@@ -16,9 +16,12 @@
*/
package org.apache.nifi.stream.io.util;
+import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
+import org.apache.nifi.stream.io.exception.TokenTooLargeException;
+
/**
* The <code>StreamDemarcator</code> class takes an input stream and demarcates
* it so it could be read (see {@link #nextToken()}) as individual byte[]
@@ -26,7 +29,7 @@ import java.io.InputStream;
* stream will be read into a single token which may result in
* {@link OutOfMemoryError} if stream is too large.
*/
-public class StreamDemarcator {
+public class StreamDemarcator implements Closeable {
private final static int INIT_BUFFER_SIZE = 8192;
@@ -95,8 +98,10 @@ public class StreamDemarcator {
/**
* Will read the next data token from the {@link InputStream} returning null
* when it reaches the end of the stream.
+ *
+ * @throws IOException if unable to read from the stream
*/
- public byte[] nextToken() {
+ public byte[] nextToken() throws IOException {
byte[] data = null;
int j = 0;
@@ -126,8 +131,10 @@ public class StreamDemarcator {
/**
* Will fill the current buffer from current 'index' position, expanding it
* and or shuffling it if necessary
+ *
+ * @throws IOException if unable to read from the stream
*/
- private void fill() {
+ private void fill() throws IOException {
if (this.index >= this.buffer.length) {
if (this.mark == 0) { // expand
byte[] newBuff = new byte[this.buffer.length + this.initialBufferSize];
@@ -141,20 +148,16 @@ public class StreamDemarcator {
}
}
- try {
- int bytesRead;
- do {
- bytesRead = this.is.read(this.buffer, this.index, this.buffer.length - this.index);
- } while (bytesRead == 0);
+ int bytesRead;
+ do {
+ bytesRead = this.is.read(this.buffer, this.index, this.buffer.length - this.index);
+ } while (bytesRead == 0);
- if (bytesRead != -1) {
- this.readAheadLength = this.index + bytesRead;
- if (this.readAheadLength > this.maxDataSize) {
- throw new IllegalStateException("Maximum allowed data size of " + this.maxDataSize + " exceeded.");
- }
+ if (bytesRead != -1) {
+ this.readAheadLength = this.index + bytesRead;
+ if (this.readAheadLength > this.maxDataSize) {
+ throw new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + this.maxDataSize + " bytes.");
}
- } catch (IOException e) {
- throw new IllegalStateException("Failed while reading InputStream", e);
}
}
@@ -188,4 +191,9 @@ public class StreamDemarcator {
throw new IllegalArgumentException("'delimiterBytes' is an optional argument, but when provided its length must be > 0");
}
}
+
+ @Override
+ public void close() throws IOException {
+ is.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b9cb6b1b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java
index 93082a2..66d2668 100644
--- a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java
+++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@@ -65,7 +66,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateNoDelimiter() {
+ public void validateNoDelimiter() throws IOException {
String data = "Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
StreamDemarcator scanner = new StreamDemarcator(is, null, 1000);
@@ -76,7 +77,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateNoDelimiterSmallInitialBuffer() {
+ public void validateNoDelimiterSmallInitialBuffer() throws IOException {
String data = "Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
StreamDemarcator scanner = new StreamDemarcator(is, null, 1000, 1);
@@ -84,7 +85,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateSingleByteDelimiter() {
+ public void validateSingleByteDelimiter() throws IOException {
String data = "Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
StreamDemarcator scanner = new StreamDemarcator(is, ",".getBytes(StandardCharsets.UTF_8), 1000);
@@ -95,7 +96,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateDelimiterAtTheBeginning() {
+ public void validateDelimiterAtTheBeginning() throws IOException {
String data = ",Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
StreamDemarcator scanner = new StreamDemarcator(is, ",".getBytes(StandardCharsets.UTF_8), 1000);
@@ -106,7 +107,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateEmptyDelimiterSegments() {
+ public void validateEmptyDelimiterSegments() throws IOException {
String data = ",,,,,Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
StreamDemarcator scanner = new StreamDemarcator(is, ",".getBytes(StandardCharsets.UTF_8), 1000);
@@ -117,7 +118,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateSingleByteDelimiterSmallInitialBuffer() {
+ public void validateSingleByteDelimiterSmallInitialBuffer() throws IOException {
String data = "Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
StreamDemarcator scanner = new StreamDemarcator(is, ",".getBytes(StandardCharsets.UTF_8), 1000, 2);
@@ -128,7 +129,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateWithMultiByteDelimiter() {
+ public void validateWithMultiByteDelimiter() throws IOException {
String data = "foodaabardaabazzz";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
StreamDemarcator scanner = new StreamDemarcator(is, "daa".getBytes(StandardCharsets.UTF_8), 1000);
@@ -139,7 +140,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateWithMultiByteDelimiterAtTheBeginning() {
+ public void validateWithMultiByteDelimiterAtTheBeginning() throws IOException {
String data = "daafoodaabardaabazzz";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
StreamDemarcator scanner = new StreamDemarcator(is, "daa".getBytes(StandardCharsets.UTF_8), 1000);
@@ -150,7 +151,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateWithMultiByteDelimiterSmallInitialBuffer() {
+ public void validateWithMultiByteDelimiterSmallInitialBuffer() throws IOException {
String data = "foodaabarffdaabazz";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
StreamDemarcator scanner = new StreamDemarcator(is, "daa".getBytes(StandardCharsets.UTF_8), 1000, 1);
@@ -161,7 +162,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateWithMultiByteCharsNoDelimiter() {
+ public void validateWithMultiByteCharsNoDelimiter() throws IOException {
String data = "\u50e0THIS IS MY NEW TEXT.\u50e0IT HAS A NEWLINE.";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
StreamDemarcator scanner = new StreamDemarcator(is, null, 1000);
@@ -172,7 +173,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateWithMultiByteCharsNoDelimiterSmallInitialBuffer() {
+ public void validateWithMultiByteCharsNoDelimiterSmallInitialBuffer() throws IOException {
String data = "\u50e0THIS IS MY NEW TEXT.\u50e0IT HAS A NEWLINE.";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
StreamDemarcator scanner = new StreamDemarcator(is, null, 1000, 2);
@@ -183,7 +184,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateWithComplexDelimiter() {
+ public void validateWithComplexDelimiter() throws IOException {
String data = "THIS IS MY TEXT<MYDELIMITER>THIS IS MY NEW TEXT<MYDELIMITER>THIS IS MY NEWEST TEXT";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
StreamDemarcator scanner = new StreamDemarcator(is, "<MYDELIMITER>".getBytes(StandardCharsets.UTF_8), 1000);
@@ -193,8 +194,8 @@ public class StreamDemarcatorTest {
assertNull(scanner.nextToken());
}
- @Test(expected = IllegalStateException.class)
- public void validateMaxBufferSize() {
+ @Test(expected = IOException.class)
+ public void validateMaxBufferSize() throws IOException {
String data = "THIS IS MY TEXT<MY DELIMITER>THIS IS MY NEW TEXT<MY DELIMITER>THIS IS MY NEWEST TEXT";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
StreamDemarcator scanner = new StreamDemarcator(is, "<MY DELIMITER>".getBytes(StandardCharsets.UTF_8), 20);
@@ -202,7 +203,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateScannerHandlesNegativeOneByteInputsNoDelimiter() {
+ public void validateScannerHandlesNegativeOneByteInputsNoDelimiter() throws IOException {
ByteArrayInputStream is = new ByteArrayInputStream(new byte[] { 0, 0, 0, 0, -1, 0, 0, 0 });
StreamDemarcator scanner = new StreamDemarcator(is, null, 20);
byte[] b = scanner.nextToken();
@@ -210,7 +211,7 @@ public class StreamDemarcatorTest {
}
@Test
- public void validateScannerHandlesNegativeOneByteInputs() {
+ public void validateScannerHandlesNegativeOneByteInputs() throws IOException {
ByteArrayInputStream is = new ByteArrayInputStream(new byte[] { 0, 0, 0, 0, -1, 0, 0, 0 });
StreamDemarcator scanner = new StreamDemarcator(is, "water".getBytes(StandardCharsets.UTF_8), 20, 1024);
byte[] b = scanner.nextToken();
@@ -218,10 +219,59 @@ public class StreamDemarcatorTest {
}
@Test
- public void verifyScannerHandlesNegativeOneByteDelimiter() {
+ public void verifyScannerHandlesNegativeOneByteDelimiter() throws IOException {
ByteArrayInputStream is = new ByteArrayInputStream(new byte[] { 0, 0, 0, 0, -1, 0, 0, 0 });
StreamDemarcator scanner = new StreamDemarcator(is, new byte[] { -1 }, 20, 1024);
assertArrayEquals(scanner.nextToken(), new byte[] { 0, 0, 0, 0 });
assertArrayEquals(scanner.nextToken(), new byte[] { 0, 0, 0 });
}
+
+ @Test
+ public void testWithoutTrailingDelimiter() throws IOException {
+ final byte[] inputData = "Larger Message First\nSmall".getBytes(StandardCharsets.UTF_8);
+ ByteArrayInputStream is = new ByteArrayInputStream(inputData);
+ StreamDemarcator scanner = new StreamDemarcator(is, "\n".getBytes(), 1000);
+
+ final byte[] first = scanner.nextToken();
+ final byte[] second = scanner.nextToken();
+ assertNotNull(first);
+ assertNotNull(second);
+
+ assertEquals("Larger Message First", new String(first, StandardCharsets.UTF_8));
+ assertEquals("Small", new String(second, StandardCharsets.UTF_8));
+ }
+
+ @Test
+ public void testOnBufferSplitNoTrailingDelimiter() throws IOException {
+ final byte[] inputData = "Yes\nNo".getBytes(StandardCharsets.UTF_8);
+ ByteArrayInputStream is = new ByteArrayInputStream(inputData);
+ StreamDemarcator scanner = new StreamDemarcator(is, "\n".getBytes(), 1000, 3);
+
+ final byte[] first = scanner.nextToken();
+ final byte[] second = scanner.nextToken();
+ assertNotNull(first);
+ assertNotNull(second);
+
+ assertArrayEquals(first, new byte[] {'Y', 'e', 's'});
+ assertArrayEquals(second, new byte[] {'N', 'o'});
+ }
+
+ @Test
+ public void testOnBufferSplit() throws IOException {
+ final byte[] inputData = "123\n456\n789".getBytes(StandardCharsets.UTF_8);
+ ByteArrayInputStream is = new ByteArrayInputStream(inputData);
+ StreamDemarcator scanner = new StreamDemarcator(is, "\n".getBytes(), 1000, 3);
+
+ final byte[] first = scanner.nextToken();
+ final byte[] second = scanner.nextToken();
+ final byte[] third = scanner.nextToken();
+ assertNotNull(first);
+ assertNotNull(second);
+ assertNotNull(third);
+
+ assertArrayEquals(first, new byte[] {'1', '2', '3'});
+ assertArrayEquals(second, new byte[] {'4', '5', '6'});
+ assertArrayEquals(third, new byte[] {'7', '8', '9'});
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b9cb6b1b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 2a1451a..7c0cc0f 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -369,54 +369,55 @@ public class StandardProcessorTestRunner implements TestRunner {
}
@Override
- public void enqueue(final Path path) throws IOException {
- enqueue(path, new HashMap<String, String>());
+ public MockFlowFile enqueue(final Path path) throws IOException {
+ return enqueue(path, new HashMap<String, String>());
}
@Override
- public void enqueue(final Path path, final Map<String, String> attributes) throws IOException {
+ public MockFlowFile enqueue(final Path path, final Map<String, String> attributes) throws IOException {
final Map<String, String> modifiedAttributes = new HashMap<>(attributes);
if (!modifiedAttributes.containsKey(CoreAttributes.FILENAME.key())) {
modifiedAttributes.put(CoreAttributes.FILENAME.key(), path.toFile().getName());
}
try (final InputStream in = Files.newInputStream(path)) {
- enqueue(in, modifiedAttributes);
+ return enqueue(in, modifiedAttributes);
}
}
@Override
- public void enqueue(final byte[] data) {
- enqueue(data, new HashMap<String, String>());
+ public MockFlowFile enqueue(final byte[] data) {
+ return enqueue(data, new HashMap<String, String>());
}
@Override
- public void enqueue(final String data) {
- enqueue(data.getBytes(StandardCharsets.UTF_8), Collections.emptyMap());
+ public MockFlowFile enqueue(final String data) {
+ return enqueue(data.getBytes(StandardCharsets.UTF_8), Collections.<String, String> emptyMap());
}
@Override
- public void enqueue(final byte[] data, final Map<String, String> attributes) {
- enqueue(new ByteArrayInputStream(data), attributes);
+ public MockFlowFile enqueue(final byte[] data, final Map<String, String> attributes) {
+ return enqueue(new ByteArrayInputStream(data), attributes);
}
@Override
- public void enqueue(final String data, final Map<String, String> attributes) {
- enqueue(data.getBytes(StandardCharsets.UTF_8), attributes);
+ public MockFlowFile enqueue(final String data, final Map<String, String> attributes) {
+ return enqueue(data.getBytes(StandardCharsets.UTF_8), attributes);
}
@Override
- public void enqueue(final InputStream data) {
- enqueue(data, new HashMap<String, String>());
+ public MockFlowFile enqueue(final InputStream data) {
+ return enqueue(data, new HashMap<String, String>());
}
@Override
- public void enqueue(final InputStream data, final Map<String, String> attributes) {
+ public MockFlowFile enqueue(final InputStream data, final Map<String, String> attributes) {
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), processor);
MockFlowFile flowFile = session.create();
flowFile = session.importFrom(data, flowFile);
flowFile = session.putAllAttributes(flowFile, attributes);
enqueue(flowFile);
+ return flowFile;
}
@Override
@@ -878,17 +879,20 @@ public class StandardProcessorTestRunner implements TestRunner {
@Override
public void assertAllConditionsMet(final Relationship relationship, Predicate<MockFlowFile> predicate) {
- if (predicate==null)
+ if (predicate==null) {
Assert.fail("predicate cannot be null");
+ }
final List<MockFlowFile> flowFiles = getFlowFilesForRelationship(relationship);
- if (flowFiles.isEmpty())
+ if (flowFiles.isEmpty()) {
Assert.fail("Relationship " + relationship.getName() + " does not contain any FlowFile");
+ }
for (MockFlowFile flowFile : flowFiles) {
- if (predicate.test(flowFile)==false)
+ if (predicate.test(flowFile)==false) {
Assert.fail("FlowFile " + flowFile + " does not meet all condition");
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b9cb6b1b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index 0512416..1c014c3 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -383,7 +383,7 @@ public interface TestRunner {
* @param path to read content from
* @throws IOException if unable to read content
*/
- void enqueue(Path path) throws IOException;
+ MockFlowFile enqueue(Path path) throws IOException;
/**
* Reads the content from the given {@link Path} into memory and creates a
@@ -394,7 +394,7 @@ public interface TestRunner {
* @param attributes attributes to use for new flow file
* @throws IOException if unable to read content
*/
- void enqueue(Path path, Map<String, String> attributes) throws IOException;
+ MockFlowFile enqueue(Path path, Map<String, String> attributes) throws IOException;
/**
* Copies the content from the given byte array into memory and creates a
@@ -403,7 +403,7 @@ public interface TestRunner {
*
* @param data to enqueue
*/
- void enqueue(byte[] data);
+ MockFlowFile enqueue(byte[] data);
/**
* Creates a FlowFile with the content set to the given string (in UTF-8 format), with no attributes,
@@ -411,7 +411,7 @@ public interface TestRunner {
*
* @param data to enqueue
*/
- void enqueue(String data);
+ MockFlowFile enqueue(String data);
/**
* Copies the content from the given byte array into memory and creates a
@@ -421,7 +421,7 @@ public interface TestRunner {
* @param data to enqueue
* @param attributes to use for enqueued item
*/
- void enqueue(byte[] data, Map<String, String> attributes);
+ MockFlowFile enqueue(byte[] data, Map<String, String> attributes);
/**
* Creates a FlowFile with the content set to the given string (in UTF-8 format), with the given attributes,
@@ -430,7 +430,7 @@ public interface TestRunner {
* @param data to enqueue
* @param attributes to use for enqueued item
*/
- void enqueue(String data, Map<String, String> attributes);
+ MockFlowFile enqueue(String data, Map<String, String> attributes);
/**
* Reads the content from the given {@link InputStream} into memory and
@@ -439,7 +439,7 @@ public interface TestRunner {
*
* @param data to source data from
*/
- void enqueue(InputStream data);
+ MockFlowFile enqueue(InputStream data);
/**
* Reads the content from the given {@link InputStream} into memory and
@@ -449,7 +449,7 @@ public interface TestRunner {
* @param data source of data
* @param attributes to use for flow files
*/
- void enqueue(InputStream data, Map<String, String> attributes);
+ MockFlowFile enqueue(InputStream data, Map<String, String> attributes);
/**
* Copies the contents of the given {@link MockFlowFile} into a byte array
http://git-wip-us.apache.org/repos/asf/nifi/blob/b9cb6b1b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
index b061fcc..8ca3494 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
@@ -223,7 +223,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
final byte[] demarcator = context.getProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR).isSet()
? context.getProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
: null;
- final Map<String, String> props = new HashMap<>();
+ final Map<String, Object> props = new HashMap<>();
KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
http://git-wip-us.apache.org/repos/asf/nifi/blob/b9cb6b1b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index fba8cb5..baacdc7 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -74,7 +74,7 @@ public class ConsumerPool implements Closeable {
public ConsumerPool(
final int maxConcurrentLeases,
final byte[] demarcator,
- final Map<String, String> kafkaProperties,
+ final Map<String, Object> kafkaProperties,
final List<String> topics,
final long maxWaitMillis,
final String keyEncoding,
@@ -148,7 +148,7 @@ public class ConsumerPool implements Closeable {
});
}
- private void closeConsumer(final Consumer consumer) {
+ private void closeConsumer(final Consumer<?, ?> consumer) {
consumerClosedCountRef.incrementAndGet();
try {
consumer.unsubscribe();
http://git-wip-us.apache.org/repos/asf/nifi/blob/b9cb6b1b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
new file mode 100644
index 0000000..e7d5cb7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+public class InFlightMessageTracker {
+ private final ConcurrentMap<FlowFile, Counts> messageCountsByFlowFile = new ConcurrentHashMap<>();
+ private final ConcurrentMap<FlowFile, Exception> failures = new ConcurrentHashMap<>();
+ private final Object progressMutex = new Object();
+
+ public void incrementAcknowledgedCount(final FlowFile flowFile) {
+ final Counts counter = messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts());
+ counter.incrementAcknowledgedCount();
+
+ synchronized (progressMutex) {
+ progressMutex.notify();
+ }
+ }
+
+ public int getAcknowledgedCount(final FlowFile flowFile) {
+ final Counts counter = messageCountsByFlowFile.get(flowFile);
+ return (counter == null) ? 0 : counter.getAcknowledgedCount();
+ }
+
+ public void incrementSentCount(final FlowFile flowFile) {
+ final Counts counter = messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts());
+ counter.incrementSentCount();
+ }
+
+ public int getSentCount(final FlowFile flowFile) {
+ final Counts counter = messageCountsByFlowFile.get(flowFile);
+ return (counter == null) ? 0 : counter.getSentCount();
+ }
+
+ public void fail(final FlowFile flowFile, final Exception exception) {
+ failures.putIfAbsent(flowFile, exception);
+
+ synchronized (progressMutex) {
+ progressMutex.notify();
+ }
+ }
+
+ public Exception getFailure(final FlowFile flowFile) {
+ return failures.get(flowFile);
+ }
+
+ public boolean isFailed(final FlowFile flowFile) {
+ return getFailure(flowFile) != null;
+ }
+
+ public void reset() {
+ messageCountsByFlowFile.clear();
+ failures.clear();
+ }
+
+ public PublishResult failOutstanding(final Exception exception) {
+ messageCountsByFlowFile.keySet().stream()
+ .filter(ff -> !isComplete(ff))
+ .filter(ff -> !failures.containsKey(ff))
+ .forEach(ff -> failures.put(ff, exception));
+
+ return createPublishResult();
+ }
+
+ private boolean isComplete(final FlowFile flowFile) {
+ final Counts counts = messageCountsByFlowFile.get(flowFile);
+ if (counts.getAcknowledgedCount() == counts.getSentCount()) {
+ // all messages received successfully.
+ return true;
+ }
+
+ if (failures.containsKey(flowFile)) {
+ // FlowFile failed so is complete
+ return true;
+ }
+
+ return false;
+ }
+
+ private boolean isComplete() {
+ return messageCountsByFlowFile.keySet().stream()
+ .allMatch(flowFile -> isComplete(flowFile));
+ }
+
+ void awaitCompletion(final long millis) throws InterruptedException, TimeoutException {
+ final long startTime = System.nanoTime();
+ final long maxTime = startTime + TimeUnit.MILLISECONDS.toNanos(millis);
+
+ while (System.nanoTime() < maxTime) {
+ synchronized (progressMutex) {
+ if (isComplete()) {
+ return;
+ }
+
+ progressMutex.wait(millis);
+ }
+ }
+
+ throw new TimeoutException();
+ }
+
+
+ PublishResult createPublishResult() {
+ return new PublishResult() {
+ @Override
+ public Collection<FlowFile> getSuccessfulFlowFiles() {
+ if (failures.isEmpty()) {
+ return messageCountsByFlowFile.keySet();
+ }
+
+ final Set<FlowFile> flowFiles = new HashSet<>(messageCountsByFlowFile.keySet());
+ flowFiles.removeAll(failures.keySet());
+ return flowFiles;
+ }
+
+ @Override
+ public Collection<FlowFile> getFailedFlowFiles() {
+ return failures.keySet();
+ }
+
+ @Override
+ public int getSuccessfulMessageCount(final FlowFile flowFile) {
+ return getAcknowledgedCount(flowFile);
+ }
+
+ @Override
+ public Exception getReasonForFailure(final FlowFile flowFile) {
+ return getFailure(flowFile);
+ }
+ };
+ }
+
+ public static class Counts {
+ private final AtomicInteger sentCount = new AtomicInteger(0);
+ private final AtomicInteger acknowledgedCount = new AtomicInteger(0);
+
+ public void incrementSentCount() {
+ sentCount.incrementAndGet();
+ }
+
+ public void incrementAcknowledgedCount() {
+ acknowledgedCount.incrementAndGet();
+ }
+
+ public int getAcknowledgedCount() {
+ return acknowledgedCount.get();
+ }
+
+ public int getSentCount() {
+ return sentCount.get();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b9cb6b1b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index 707a431..3d09f2d 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -27,8 +27,9 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
-import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
@@ -186,7 +187,7 @@ final class KafkaProcessorUtils {
final Class<?> classType;
- public KafkaConfigValidator(final Class classType) {
+ public KafkaConfigValidator(final Class<?> classType) {
this.classType = classType;
}
@@ -211,7 +212,8 @@ final class KafkaProcessorUtils {
return builder.toString();
}
- static void buildCommonKafkaProperties(final ProcessContext context, final Class kafkaConfigClass, final Map<String, String> mapToPopulate) {
+
+ static void buildCommonKafkaProperties(final ProcessContext context, final Class<?> kafkaConfigClass, final Map<String, Object> mapToPopulate) {
for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) {
if (propertyDescriptor.equals(SSL_CONTEXT_SERVICE)) {
// Translate SSLContext Service configuration into Kafka properties
@@ -230,28 +232,33 @@ final class KafkaProcessorUtils {
mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, sslContextService.getTrustStoreType());
}
}
- String pName = propertyDescriptor.getName();
- String pValue = propertyDescriptor.isExpressionLanguageSupported()
+
+ String propertyName = propertyDescriptor.getName();
+ String propertyValue = propertyDescriptor.isExpressionLanguageSupported()
? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
: context.getProperty(propertyDescriptor).getValue();
- if (pValue != null) {
- if (pName.endsWith(".ms")) { // kafka standard time notation
- pValue = String.valueOf(FormatUtils.getTimeDuration(pValue.trim(), TimeUnit.MILLISECONDS));
+
+ if (propertyValue != null) {
+ // If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds
+ // or the standard NiFi time period such as "5 secs"
+ if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
+ propertyValue = String.valueOf(FormatUtils.getTimeDuration(propertyValue.trim(), TimeUnit.MILLISECONDS));
}
- if (isStaticStringFieldNamePresent(pName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) {
- mapToPopulate.put(pName, pValue);
+
+ if (isStaticStringFieldNamePresent(propertyName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) {
+ mapToPopulate.put(propertyName, propertyValue);
}
}
}
}
- private static boolean isStaticStringFieldNamePresent(final String name, final Class... classes) {
+ private static boolean isStaticStringFieldNamePresent(final String name, final Class<?>... classes) {
return KafkaProcessorUtils.getPublicStaticStringFieldValues(classes).contains(name);
}
- private static Set<String> getPublicStaticStringFieldValues(final Class... classes) {
+ private static Set<String> getPublicStaticStringFieldValues(final Class<?>... classes) {
final Set<String> strings = new HashSet<>();
- for (final Class classType : classes) {
+ for (final Class<?> classType : classes) {
for (final Field field : classType.getDeclaredFields()) {
if (Modifier.isPublic(field.getModifiers()) && Modifier.isStatic(field.getModifiers()) && field.getType().equals(String.class)) {
try {
http://git-wip-us.apache.org/repos/asf/nifi/blob/b9cb6b1b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
deleted file mode 100644
index 31a084f..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.io.Closeable;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.stream.io.util.StreamDemarcator;
-
-/**
- * Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor
- * with sending contents of the {@link FlowFile}s to Kafka.
- */
-class KafkaPublisher implements Closeable {
-
- private final Producer<byte[], byte[]> kafkaProducer;
-
- private volatile long ackWaitTime = 30000;
-
- private final ComponentLog componentLog;
-
- private final int ackCheckSize;
-
- KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) {
- this(kafkaProperties, 100, componentLog);
- }
-
- /**
- * Creates an instance of this class as well as the instance of the
- * corresponding Kafka {@link KafkaProducer} using provided Kafka
- * configuration properties.
- *
- * @param kafkaProperties instance of {@link Properties} used to bootstrap
- * {@link KafkaProducer}
- */
- KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog componentLog) {
- this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
- this.ackCheckSize = ackCheckSize;
- this.componentLog = componentLog;
- }
-
- /**
- * Publishes messages to Kafka topic. It uses {@link StreamDemarcator} to
- * determine how many messages to Kafka will be sent from a provided
- * {@link InputStream} (see {@link PublishingContext#getContentStream()}).
- * It supports two publishing modes:
- * <ul>
- * <li>Sending all messages constructed from
- * {@link StreamDemarcator#nextToken()} operation.</li>
- * <li>Sending only unacknowledged messages constructed from
- * {@link StreamDemarcator#nextToken()} operation.</li>
- * </ul>
- * The unacknowledged messages are determined from the value of
- * {@link PublishingContext#getLastAckedMessageIndex()}.
- * <br>
- * This method assumes content stream affinity where it is expected that the
- * content stream that represents the same Kafka message(s) will remain the
- * same across possible retries. This is required specifically for cases
- * where delimiter is used and a single content stream may represent
- * multiple Kafka messages. The
- * {@link PublishingContext#getLastAckedMessageIndex()} will provide the
- * index of the last ACKed message, so upon retry only messages with the
- * higher index are sent.
- *
- * @param publishingContext instance of {@link PublishingContext} which hold
- * context information about the message(s) to be sent.
- * @return The index of the last successful offset.
- */
- KafkaPublisherResult publish(PublishingContext publishingContext) {
- StreamDemarcator streamTokenizer = new StreamDemarcator(publishingContext.getContentStream(),
- publishingContext.getDelimiterBytes(), publishingContext.getMaxRequestSize());
-
- int prevLastAckedMessageIndex = publishingContext.getLastAckedMessageIndex();
- List<Future<RecordMetadata>> resultFutures = new ArrayList<>();
-
- byte[] messageBytes;
- int tokenCounter = 0;
- boolean continueSending = true;
- KafkaPublisherResult result = null;
- for (; continueSending && (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) {
- if (prevLastAckedMessageIndex < tokenCounter) {
- ProducerRecord<byte[], byte[]> message = new ProducerRecord<>(publishingContext.getTopic(), publishingContext.getKeyBytes(), messageBytes);
- resultFutures.add(this.kafkaProducer.send(message));
-
- if (tokenCounter % this.ackCheckSize == 0) {
- int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
- resultFutures.clear();
- if (lastAckedMessageIndex % this.ackCheckSize != 0) {
- continueSending = false;
- result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
- }
- prevLastAckedMessageIndex = lastAckedMessageIndex;
- }
- }
- }
-
- if (result == null) {
- int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
- resultFutures.clear();
- result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
- }
- return result;
- }
-
- /**
- * Sets the time this publisher will wait for the {@link Future#get()}
- * operation (the Future returned by
- * {@link KafkaProducer#send(ProducerRecord)}) to complete before timing
- * out.
- *
- * This value will also be used as a timeout when closing the underlying
- * {@link KafkaProducer}. See {@link #close()}.
- */
- void setAckWaitTime(long ackWaitTime) {
- this.ackWaitTime = ackWaitTime;
- }
-
- /**
- * This operation will process ACKs from Kafka in the order in which
- * {@link KafkaProducer#send(ProducerRecord)} invocation were made returning
- * the index of the last ACKed message. Within this operation processing ACK
- * simply means successful invocation of 'get()' operation on the
- * {@link Future} returned by {@link KafkaProducer#send(ProducerRecord)}
- * operation. Upon encountering any type of error while interrogating such
- * {@link Future} the ACK loop will end. Messages that were not ACKed would
- * be considered non-delivered and therefore could be resent at the later
- * time.
- *
- * @param sendFutures list of {@link Future}s representing results of
- * publishing to Kafka
- *
- * @param lastAckMessageIndex the index of the last ACKed message. It is
- * important to provide the last ACKed message especially while re-trying so
- * the proper index is maintained.
- */
- private int processAcks(List<Future<RecordMetadata>> sendFutures, int lastAckMessageIndex) {
- boolean exceptionThrown = false;
- for (int segmentCounter = 0; segmentCounter < sendFutures.size() && !exceptionThrown; segmentCounter++) {
- Future<RecordMetadata> future = sendFutures.get(segmentCounter);
- try {
- future.get(this.ackWaitTime, TimeUnit.MILLISECONDS);
- lastAckMessageIndex++;
- } catch (InterruptedException e) {
- exceptionThrown = true;
- Thread.currentThread().interrupt();
- this.warnOrError("Interrupted while waiting for acks from Kafka", null);
- } catch (ExecutionException e) {
- exceptionThrown = true;
- this.warnOrError("Failed while waiting for acks from Kafka", e);
- } catch (TimeoutException e) {
- exceptionThrown = true;
- this.warnOrError("Timed out while waiting for acks from Kafka", null);
- }
- }
-
- return lastAckMessageIndex;
- }
-
- /**
- * Will close the underlying {@link KafkaProducer} waiting if necessary for
- * the same duration as supplied {@link #setAckWaitTime(long)}
- */
- @Override
- public void close() {
- this.kafkaProducer.close(this.ackWaitTime, TimeUnit.MILLISECONDS);
- }
-
- /**
- *
- */
- private void warnOrError(String message, Exception e) {
- if (e == null) {
- this.componentLog.warn(message);
- } else {
- this.componentLog.error(message, e);
- }
- }
-
- /**
- * Encapsulates the result received from publishing messages to Kafka
- */
- static class KafkaPublisherResult {
-
- private final int messagesSent;
- private final int lastMessageAcked;
-
- KafkaPublisherResult(int messagesSent, int lastMessageAcked) {
- this.messagesSent = messagesSent;
- this.lastMessageAcked = lastMessageAcked;
- }
-
- public int getMessagesSent() {
- return this.messagesSent;
- }
-
- public int getLastMessageAcked() {
- return this.lastMessageAcked;
- }
-
- public boolean isAllAcked() {
- return this.lastMessageAcked > -1 && this.messagesSent - 1 == this.lastMessageAcked;
- }
-
- @Override
- public String toString() {
- return "Sent:" + this.messagesSent + "; Last ACK:" + this.lastMessageAcked;
- }
- }
-}