You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/04/07 13:38:43 UTC
nifi git commit: NIFI-1736 Move kafka.StreamScanner to nifi-utils.
This closes #333
Repository: nifi
Updated Branches:
refs/heads/pr/333 [created] 9235a28f8
NIFI-1736 Move kafka.StreamScanner to nifi-utils. This closes #333
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9235a28f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9235a28f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9235a28f
Branch: refs/heads/pr/333
Commit: 9235a28f82cece2eeb6de1b4767910d9b8bf8ddc
Parents: 3adb45e
Author: ijokarumawak <ij...@gmail.com>
Authored: Thu Apr 7 14:56:41 2016 +0900
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Thu Apr 7 07:37:28 2016 -0400
----------------------------------------------------------------------
.../nifi/stream/io/util/StreamScanner.java | 164 +++++++++++++++++++
.../nifi/stream/io/util/StreamScannerTests.java | 130 +++++++++++++++
.../nifi/processors/kafka/KafkaPublisher.java | 5 +-
.../nifi/processors/kafka/StreamScanner.java | 164 -------------------
.../processors/kafka/StreamScannerTests.java | 130 ---------------
5 files changed, 297 insertions(+), 296 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/9235a28f/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamScanner.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamScanner.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamScanner.java
new file mode 100644
index 0000000..901f31a
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamScanner.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io.util;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+public class StreamScanner {
+
+ private final static int EOF = -1;
+
+ private final InputStream is;
+
+ private final byte[] delimiterBytes;
+
+ private final int maxDataSize;
+
+ private ByteBuffer buffer;
+
+ private byte[] data;
+
+ /**
+ * Constructs a new instance
+ *
+ * @param is
+ * instance of {@link InputStream} representing the data
+ * @param delimiterBytes
+ * byte array representing delimiter bytes used to split the
+ * input stream. Can be null
+ * @param maxDataSize
+ * maximum size of data derived from the input stream. This means
+ * that neither {@link InputStream} nor its individual chunks (if
+ * delimiter is used) can ever be greater then this size.
+ */
+ public StreamScanner(InputStream is, byte[] delimiterBytes, int maxDataSize) {
+ this(is, delimiterBytes, maxDataSize, 8192);
+ }
+
+ /**
+ * Constructs a new instance
+ *
+ * @param is
+ * instance of {@link InputStream} representing the data
+ * @param delimiterBytes
+ * byte array representing delimiter bytes used to split the
+ * input stream. Can be null
+ * @param maxDataSize
+ * maximum size of data derived from the input stream. This means
+ * that neither {@link InputStream} nor its individual chunks (if
+ * delimiter is used) can ever be greater then this size.
+ * @param initialBufferSize
+ * initial size of the buffer used to buffer {@link InputStream}
+ * or its parts (if delimiter is used) to create its byte[]
+ * representation. Must be positive integer. The buffer will grow
+ * automatically as needed up to the Integer.MAX_VALUE;
+ *
+ */
+ public StreamScanner(InputStream is, byte[] delimiterBytes, int maxDataSize, int initialBufferSize) {
+ this.is = new BufferedInputStream(is);
+ this.delimiterBytes = delimiterBytes;
+ this.buffer = ByteBuffer.allocate(initialBufferSize);
+ this.maxDataSize = maxDataSize;
+ }
+
+ /**
+ * Checks if there are more elements in the stream. This operation is
+ * idempotent.
+ *
+ * @return <i>true</i> if there are more elements in the stream or
+ * <i>false</i> when it reaches the end of the stream after the last
+ * element was retrieved via {@link #next()} operation.
+ */
+ public boolean hasNext() {
+ int j = 0;
+ int readVal = 0;
+ while (this.data == null && readVal != EOF) {
+ this.expandBufferIfNecessary();
+ try {
+ readVal = this.is.read();
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed while reading InputStream", e);
+ }
+ if (readVal == EOF) {
+ this.extractDataToken(0);
+ } else {
+ byte byteVal = (byte)readVal;
+ this.buffer.put(byteVal);
+ if (this.buffer.position() > this.maxDataSize) {
+ throw new IllegalStateException("Maximum allowed data size of " + this.maxDataSize + " exceeded.");
+ }
+ if (this.delimiterBytes != null && this.delimiterBytes[j] == byteVal) {
+ if (++j == this.delimiterBytes.length) {
+ this.extractDataToken(this.delimiterBytes.length);
+ j = 0;
+ }
+ } else {
+ j = 0;
+ }
+ }
+ }
+ return this.data != null;
+ }
+
+ /**
+ * @return byte array representing the next segment in the stream or the
+ * whole stream if no delimiter is used
+ */
+ public byte[] next() {
+ try {
+ return this.data;
+ } finally {
+ this.data = null;
+ }
+ }
+
+ /**
+ *
+ */
+ private void expandBufferIfNecessary() {
+ if (this.buffer.position() == Integer.MAX_VALUE ){
+ throw new IllegalStateException("Internal buffer has reached the capacity and can not be expended any further");
+ }
+ if (this.buffer.remaining() == 0) {
+ this.buffer.flip();
+ int pos = this.buffer.capacity();
+ int newSize = this.buffer.capacity() * 2 > Integer.MAX_VALUE ? Integer.MAX_VALUE : this.buffer.capacity() * 2;
+ ByteBuffer bb = ByteBuffer.allocate(newSize);
+ bb.put(this.buffer);
+ this.buffer = bb;
+ this.buffer.position(pos);
+ }
+ }
+
+ /**
+ *
+ */
+ private void extractDataToken(int lengthSubtract) {
+ this.buffer.flip();
+ if (this.buffer.limit() > 0){ // something must be in the buffer; at least delimiter (e.g., \n)
+ this.data = new byte[this.buffer.limit() - lengthSubtract];
+ this.buffer.get(this.data);
+ }
+ this.buffer.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9235a28f/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamScannerTests.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamScannerTests.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamScannerTests.java
new file mode 100644
index 0000000..2dc8f0b
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamScannerTests.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StreamScannerTests {
+
+ @Test
+ public void validateWithMultiByteCharsNoDelimiter() {
+ String data = "僠THIS IS MY NEW TEXT.僠IT HAS A NEWLINE.";
+ ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
+ StreamScanner scanner = new StreamScanner(is, null, 1000);
+ assertTrue(scanner.hasNext());
+ assertEquals(data, new String(scanner.next(), StandardCharsets.UTF_8));
+ assertFalse(scanner.hasNext());
+ }
+
+ @Test
+ public void validateWithComplexDelimiter() {
+ String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT";
+ ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
+ StreamScanner scanner = new StreamScanner(is, "<MY DEIMITER>".getBytes(StandardCharsets.UTF_8), 1000);
+ assertTrue(scanner.hasNext());
+ assertEquals("THIS IS MY TEXT", new String(scanner.next(), StandardCharsets.UTF_8));
+ assertTrue(scanner.hasNext());
+ assertEquals("THIS IS MY NEW TEXT", new String(scanner.next(), StandardCharsets.UTF_8));
+ assertTrue(scanner.hasNext());
+ assertEquals("THIS IS MY NEWEST TEXT", new String(scanner.next(), StandardCharsets.UTF_8));
+ assertFalse(scanner.hasNext());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void validateMaxBufferSize() {
+ String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT";
+ ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
+ StreamScanner scanner = new StreamScanner(is, "<MY DEIMITER>".getBytes(StandardCharsets.UTF_8), 20);
+ assertTrue(scanner.hasNext());
+ }
+
+ @Test
+ public void verifyScannerHandlesNegativeOneByteInputs() {
+ ByteArrayInputStream is = new ByteArrayInputStream(new byte[]{0, 0, 0, 0, -1, 0, 0, 0});
+ StreamScanner scanner = new StreamScanner(is, "water".getBytes(StandardCharsets.UTF_8), 20, 1024);
+ assertTrue(scanner.hasNext());
+ Assert.assertArrayEquals(scanner.next(), new byte[]{0, 0, 0, 0, -1, 0, 0, 0});
+ }
+
+ @Test
+ public void verifyScannerHandlesNegativeOneByteDelimiter() {
+ ByteArrayInputStream is = new ByteArrayInputStream(new byte[]{0, 0, 0, 0, -1, 0, 0, 0});
+ StreamScanner scanner = new StreamScanner(is, new byte[] { -1 }, 20, 1024);
+ assertTrue(scanner.hasNext());
+ Assert.assertArrayEquals(scanner.next(), new byte[]{0, 0, 0, 0});
+ assertTrue(scanner.hasNext());
+ Assert.assertArrayEquals(scanner.next(), new byte[]{0, 0, 0});
+ }
+
+ @Test
+ public void validateHasNextIdempotencyWithDelimiter() {
+ String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT";
+ ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
+ StreamScanner scanner = new StreamScanner(is, "<MY DEIMITER>".getBytes(StandardCharsets.UTF_8), 1000);
+ for (int i = 0; i < 5; i++) { // we only have 3 segments so unless idempotent hasNext would return false after 3 tries
+ assertTrue(scanner.hasNext());
+ }
+ assertTrue(scanner.hasNext());
+ assertEquals("THIS IS MY TEXT", new String(scanner.next(), StandardCharsets.UTF_8));
+ assertTrue(scanner.hasNext());
+ assertEquals("THIS IS MY NEW TEXT", new String(scanner.next(), StandardCharsets.UTF_8));
+ assertTrue(scanner.hasNext());
+ assertEquals("THIS IS MY NEWEST TEXT", new String(scanner.next(), StandardCharsets.UTF_8));
+ assertFalse(scanner.hasNext());
+ }
+
+ @Test
+ public void validateHasNextIdempotencyWithoutDelimiter() {
+ String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT";
+ ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
+ StreamScanner scanner = new StreamScanner(is, null, 1000);
+ for (int i = 0; i < 5; i++) { // we only have 3 segments so unless idempotent hasNext would return false after 3 tries
+ assertTrue(scanner.hasNext());
+ }
+ assertTrue(scanner.hasNext());
+ assertEquals(data, new String(scanner.next(), StandardCharsets.UTF_8));
+ assertFalse(scanner.hasNext());
+ }
+
+ @Test
+ public void validateInternalBufferCanExpend() throws Exception {
+ String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT";
+ ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
+ StreamScanner scanner = new StreamScanner(is, null, 1000, 2);
+ Field bufferField = StreamScanner.class.getDeclaredField("buffer");
+ bufferField.setAccessible(true);
+ ByteBuffer buffer = (ByteBuffer) bufferField.get(scanner);
+ assertEquals(2, buffer.capacity());
+
+ assertTrue(scanner.hasNext());
+ assertEquals(data, new String(scanner.next(), StandardCharsets.UTF_8));
+ assertFalse(scanner.hasNext());
+
+ buffer = (ByteBuffer) bufferField.get(scanner);
+ assertEquals(128, buffer.capacity());
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9235a28f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
index ebdf5c8..bcf10a4 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
@@ -33,6 +33,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.stream.io.util.StreamScanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,7 +64,7 @@ class KafkaPublisher implements AutoCloseable {
KafkaPublisher(Properties kafkaProperties) {
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
- this.producer = new KafkaProducer<byte[], byte[]>(kafkaProperties);
+ this.producer = new KafkaProducer<>(kafkaProperties);
this.ackWaitTime = Long.parseLong(kafkaProperties.getProperty(ProducerConfig.TIMEOUT_CONFIG)) * 2;
try {
if (kafkaProperties.containsKey("partitioner.class")){
@@ -132,7 +133,7 @@ class KafkaPublisher implements AutoCloseable {
partitionKey = this.getPartition(key, topicName);
}
if (prevFailedSegmentIndexes == null || prevFailedSegmentIndexes.get(segmentCounter)) {
- ProducerRecord<byte[], byte[]> message = new ProducerRecord<byte[], byte[]>(topicName, partitionKey, key, content);
+ ProducerRecord<byte[], byte[]> message = new ProducerRecord<>(topicName, partitionKey, key, content);
sendFutures.add(this.toKafka(message));
}
segmentCounter++;
http://git-wip-us.apache.org/repos/asf/nifi/blob/9235a28f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java
deleted file mode 100644
index 57bbbcf..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka;
-
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-/**
- *
- */
-class StreamScanner {
-
- private final static int EOF = -1;
-
- private final InputStream is;
-
- private final byte[] delimiterBytes;
-
- private final int maxDataSize;
-
- private ByteBuffer buffer;
-
- private byte[] data;
-
- /**
- * Constructs a new instance
- *
- * @param is
- * instance of {@link InputStream} representing the data
- * @param delimiterBytes
- * byte array representing delimiter bytes used to split the
- * input stream. Can be null
- * @param maxDataSize
- * maximum size of data derived from the input stream. This means
- * that neither {@link InputStream} nor its individual chunks (if
- * delimiter is used) can ever be greater then this size.
- */
- StreamScanner(InputStream is, byte[] delimiterBytes, int maxDataSize) {
- this(is, delimiterBytes, maxDataSize, 8192);
- }
-
- /**
- * Constructs a new instance
- *
- * @param is
- * instance of {@link InputStream} representing the data
- * @param delimiterBytes
- * byte array representing delimiter bytes used to split the
- * input stream. Can be null
- * @param maxDataSize
- * maximum size of data derived from the input stream. This means
- * that neither {@link InputStream} nor its individual chunks (if
- * delimiter is used) can ever be greater then this size.
- * @param initialBufferSize
- * initial size of the buffer used to buffer {@link InputStream}
- * or its parts (if delimiter is used) to create its byte[]
- * representation. Must be positive integer. The buffer will grow
- * automatically as needed up to the Integer.MAX_VALUE;
- *
- */
- StreamScanner(InputStream is, byte[] delimiterBytes, int maxDataSize, int initialBufferSize) {
- this.is = new BufferedInputStream(is);
- this.delimiterBytes = delimiterBytes;
- this.buffer = ByteBuffer.allocate(initialBufferSize);
- this.maxDataSize = maxDataSize;
- }
-
- /**
- * Checks if there are more elements in the stream. This operation is
- * idempotent.
- *
- * @return <i>true</i> if there are more elements in the stream or
- * <i>false</i> when it reaches the end of the stream after the last
- * element was retrieved via {@link #next()} operation.
- */
- boolean hasNext() {
- int j = 0;
- int readVal = 0;
- while (this.data == null && readVal != EOF) {
- this.expandBufferIfNecessary();
- try {
- readVal = this.is.read();
- } catch (IOException e) {
- throw new IllegalStateException("Failed while reading InputStream", e);
- }
- if (readVal == EOF) {
- this.extractDataToken(0);
- } else {
- byte byteVal = (byte)readVal;
- this.buffer.put(byteVal);
- if (this.buffer.position() > this.maxDataSize) {
- throw new IllegalStateException("Maximum allowed data size of " + this.maxDataSize + " exceeded.");
- }
- if (this.delimiterBytes != null && this.delimiterBytes[j] == byteVal) {
- if (++j == this.delimiterBytes.length) {
- this.extractDataToken(this.delimiterBytes.length);
- j = 0;
- }
- } else {
- j = 0;
- }
- }
- }
- return this.data != null;
- }
-
- /**
- * @return byte array representing the next segment in the stream or the
- * whole stream if no delimiter is used
- */
- byte[] next() {
- try {
- return this.data;
- } finally {
- this.data = null;
- }
- }
-
- /**
- *
- */
- private void expandBufferIfNecessary() {
- if (this.buffer.position() == Integer.MAX_VALUE ){
- throw new IllegalStateException("Internal buffer has reached the capacity and can not be expended any further");
- }
- if (this.buffer.remaining() == 0) {
- this.buffer.flip();
- int pos = this.buffer.capacity();
- int newSize = this.buffer.capacity() * 2 > Integer.MAX_VALUE ? Integer.MAX_VALUE : this.buffer.capacity() * 2;
- ByteBuffer bb = ByteBuffer.allocate(newSize);
- bb.put(this.buffer);
- this.buffer = bb;
- this.buffer.position(pos);
- }
- }
-
- /**
- *
- */
- private void extractDataToken(int lengthSubtract) {
- this.buffer.flip();
- if (this.buffer.limit() > 0){ // something must be in the buffer; at least delimiter (e.g., \n)
- this.data = new byte[this.buffer.limit() - lengthSubtract];
- this.buffer.get(this.data);
- }
- this.buffer.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9235a28f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/StreamScannerTests.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/StreamScannerTests.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/StreamScannerTests.java
deleted file mode 100644
index 1ebc4c4..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/StreamScannerTests.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.ByteArrayInputStream;
-import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class StreamScannerTests {
-
- @Test
- public void validateWithMultiByteCharsNoDelimiter() {
- String data = "僠THIS IS MY NEW TEXT.僠IT HAS A NEWLINE.";
- ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
- StreamScanner scanner = new StreamScanner(is, null, 1000);
- assertTrue(scanner.hasNext());
- assertEquals(data, new String(scanner.next(), StandardCharsets.UTF_8));
- assertFalse(scanner.hasNext());
- }
-
- @Test
- public void validateWithComplexDelimiter() {
- String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT";
- ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
- StreamScanner scanner = new StreamScanner(is, "<MY DEIMITER>".getBytes(StandardCharsets.UTF_8), 1000);
- assertTrue(scanner.hasNext());
- assertEquals("THIS IS MY TEXT", new String(scanner.next(), StandardCharsets.UTF_8));
- assertTrue(scanner.hasNext());
- assertEquals("THIS IS MY NEW TEXT", new String(scanner.next(), StandardCharsets.UTF_8));
- assertTrue(scanner.hasNext());
- assertEquals("THIS IS MY NEWEST TEXT", new String(scanner.next(), StandardCharsets.UTF_8));
- assertFalse(scanner.hasNext());
- }
-
- @Test(expected = IllegalStateException.class)
- public void validateMaxBufferSize() {
- String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT";
- ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
- StreamScanner scanner = new StreamScanner(is, "<MY DEIMITER>".getBytes(StandardCharsets.UTF_8), 20);
- assertTrue(scanner.hasNext());
- }
-
- @Test
- public void verifyScannerHandlesNegativeOneByteInputs() {
- ByteArrayInputStream is = new ByteArrayInputStream(new byte[]{0, 0, 0, 0, -1, 0, 0, 0});
- StreamScanner scanner = new StreamScanner(is, "water".getBytes(StandardCharsets.UTF_8), 20, 1024);
- assertTrue(scanner.hasNext());
- Assert.assertArrayEquals(scanner.next(), new byte[]{0, 0, 0, 0, -1, 0, 0, 0});
- }
-
- @Test
- public void verifyScannerHandlesNegativeOneByteDelimiter() {
- ByteArrayInputStream is = new ByteArrayInputStream(new byte[]{0, 0, 0, 0, -1, 0, 0, 0});
- StreamScanner scanner = new StreamScanner(is, new byte[] { -1 }, 20, 1024);
- assertTrue(scanner.hasNext());
- Assert.assertArrayEquals(scanner.next(), new byte[]{0, 0, 0, 0});
- assertTrue(scanner.hasNext());
- Assert.assertArrayEquals(scanner.next(), new byte[]{0, 0, 0});
- }
-
- @Test
- public void validateHasNextIdempotencyWithDelimiter() {
- String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT";
- ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
- StreamScanner scanner = new StreamScanner(is, "<MY DEIMITER>".getBytes(StandardCharsets.UTF_8), 1000);
- for (int i = 0; i < 5; i++) { // we only have 3 segments so unless idempotent hasNext would return false after 3 tries
- assertTrue(scanner.hasNext());
- }
- assertTrue(scanner.hasNext());
- assertEquals("THIS IS MY TEXT", new String(scanner.next(), StandardCharsets.UTF_8));
- assertTrue(scanner.hasNext());
- assertEquals("THIS IS MY NEW TEXT", new String(scanner.next(), StandardCharsets.UTF_8));
- assertTrue(scanner.hasNext());
- assertEquals("THIS IS MY NEWEST TEXT", new String(scanner.next(), StandardCharsets.UTF_8));
- assertFalse(scanner.hasNext());
- }
-
- @Test
- public void validateHasNextIdempotencyWithoutDelimiter() {
- String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT";
- ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
- StreamScanner scanner = new StreamScanner(is, null, 1000);
- for (int i = 0; i < 5; i++) { // we only have 3 segments so unless idempotent hasNext would return false after 3 tries
- assertTrue(scanner.hasNext());
- }
- assertTrue(scanner.hasNext());
- assertEquals(data, new String(scanner.next(), StandardCharsets.UTF_8));
- assertFalse(scanner.hasNext());
- }
-
- @Test
- public void validateInternalBufferCanExpend() throws Exception {
- String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT";
- ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
- StreamScanner scanner = new StreamScanner(is, null, 1000, 2);
- Field bufferField = StreamScanner.class.getDeclaredField("buffer");
- bufferField.setAccessible(true);
- ByteBuffer buffer = (ByteBuffer) bufferField.get(scanner);
- assertEquals(2, buffer.capacity());
-
- assertTrue(scanner.hasNext());
- assertEquals(data, new String(scanner.next(), StandardCharsets.UTF_8));
- assertFalse(scanner.hasNext());
-
- buffer = (ByteBuffer) bufferField.get(scanner);
- assertEquals(128, buffer.capacity());
- }
-}