You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2016/04/05 15:37:17 UTC
[1/2] nifi git commit: NIFI-1728 Specifying charsets for messages
sent to/received from Kafka in Kafka processor and related tests to remedy
failures in Windows environments. Specifying EOF as an int instead of a byte.
Repository: nifi
Updated Branches:
refs/heads/support/nifi-0.6.x cd0d090e8 -> d1c82ac62
NIFI-1728 Specifying charsets for messages sent to/received from Kafka in Kafka processor and related tests to remedy failures in Windows environments. Specifying EOF as an int instead of a byte.
Signed-off-by: joewitt <jo...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ffa852de
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ffa852de
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ffa852de
Branch: refs/heads/support/nifi-0.6.x
Commit: ffa852de1319e4fb0c1341ab9edafe684249a67d
Parents: cd0d090
Author: Aldrin Piri <al...@apache.org>
Authored: Mon Apr 4 18:21:59 2016 -0400
Committer: joewitt <jo...@apache.org>
Committed: Tue Apr 5 09:17:03 2016 -0400
----------------------------------------------------------------------
.../nifi/processors/kafka/StreamScanner.java | 2 +-
.../processors/kafka/KafkaPublisherTest.java | 4 +-
.../nifi/processors/kafka/TestPutKafka.java | 51 ++++++++++----------
3 files changed, 29 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/ffa852de/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java
index ee83a02..57bbbcf 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java
@@ -26,7 +26,7 @@ import java.nio.ByteBuffer;
*/
class StreamScanner {
- private final static byte EOF = -1;
+ private final static int EOF = -1;
private final InputStream is;
http://git-wip-us.apache.org/repos/asf/nifi/blob/ffa852de/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
index f21dfb0..2abb51a 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
@@ -179,7 +179,7 @@ public class KafkaPublisherTest {
@Test
public void validateWithMultiByteCharacters() throws Exception {
String data = "僠THIS IS MY NEW TEXT.僠IT HAS A NEWLINE.";
- InputStream fis = new ByteArrayInputStream(data.getBytes());
+ InputStream fis = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
String topicName = "validateWithMultiByteCharacters";
Properties kafkaProperties = this.buildProducerProperties();
@@ -192,7 +192,7 @@ public class KafkaPublisherTest {
publisher.close();
ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
- String r = new String(iter.next().message());
+ String r = new String(iter.next().message(), StandardCharsets.UTF_8);
assertEquals(data, r);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ffa852de/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
index 34544df..5d75d54 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
@@ -77,18 +77,18 @@ public class TestPutKafka {
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
- runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes());
+ runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes(StandardCharsets.UTF_8));
runner.run(1, false);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
- assertEquals("Hello World", new String(consumer.next().message()));
- assertEquals("Goodbye", new String(consumer.next().message()));
- assertEquals("1", new String(consumer.next().message()));
- assertEquals("2", new String(consumer.next().message()));
- assertEquals("3", new String(consumer.next().message()));
- assertEquals("4", new String(consumer.next().message()));
- assertEquals("5", new String(consumer.next().message()));
+ assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8));
+ assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8));
+ assertEquals("1", new String(consumer.next().message(), StandardCharsets.UTF_8));
+ assertEquals("2", new String(consumer.next().message(), StandardCharsets.UTF_8));
+ assertEquals("3", new String(consumer.next().message(), StandardCharsets.UTF_8));
+ assertEquals("4", new String(consumer.next().message(), StandardCharsets.UTF_8));
+ assertEquals("5", new String(consumer.next().message(), StandardCharsets.UTF_8));
runner.shutdown();
}
@@ -106,14 +106,14 @@ public class TestPutKafka {
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
final String text = "Hello World\nGoodbye\n1\n2";
- runner.enqueue(text.getBytes());
+ runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
afterClass(); // kill Kafka right before send to ensure producer fails
runner.run(1, false);
runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1);
MockFlowFile ff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
String failedSegmentsStr = ff.getAttribute(PutKafka.ATTR_FAILED_SEGMENTS);
- BitSet fs = BitSet.valueOf(failedSegmentsStr.getBytes());
+ BitSet fs = BitSet.valueOf(failedSegmentsStr.getBytes(StandardCharsets.UTF_8));
assertTrue(fs.get(0));
assertTrue(fs.get(1));
assertTrue(fs.get(2));
@@ -148,8 +148,8 @@ public class TestPutKafka {
ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
- assertEquals("Goodbye", new String(consumer.next().message()));
- assertEquals("2", new String(consumer.next().message()));
+ assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8));
+ assertEquals("2", new String(consumer.next().message(), StandardCharsets.UTF_8));
try {
consumer.next();
fail();
@@ -169,7 +169,7 @@ public class TestPutKafka {
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
- final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
+ final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(StandardCharsets.UTF_8);
runner.enqueue(bytes);
runner.run(1);
@@ -198,14 +198,14 @@ public class TestPutKafka {
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "僠<僠WILDSTUFF僠>僠");
- runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>".getBytes());
+ runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>".getBytes(StandardCharsets.UTF_8));
runner.run(1, false);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
- assertEquals("Hello World", new String(consumer.next().message()));
- assertEquals("Goodbye", new String(consumer.next().message()));
- assertEquals("I Mean IT!僠<僠WILDSTUFF僠>", new String(consumer.next().message()));
+ assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8));
+ assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8));
+ assertEquals("I Mean IT!僠<僠WILDSTUFF僠>", new String(consumer.next().message(), StandardCharsets.UTF_8));
runner.shutdown();
}
@@ -219,15 +219,16 @@ public class TestPutKafka {
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "僠<僠WILDSTUFF僠>僠");
- runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>僠<僠WILDSTUFF僠>僠".getBytes());
+ runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>僠<僠WILDSTUFF僠>僠".getBytes(StandardCharsets.UTF_8));
runner.run(1, false);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
- assertEquals("Hello World", new String(consumer.next().message()));
- assertEquals("Goodbye", new String(consumer.next().message()));
- assertEquals("I Mean IT!", new String(consumer.next().message()));
- assertEquals("<僠WILDSTUFF僠>僠", new String(consumer.next().message()));
+ byte[] message = consumer.next().message();
+ assertEquals("Hello World", new String(message, StandardCharsets.UTF_8));
+ assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8));
+ assertEquals("I Mean IT!", new String(consumer.next().message(), StandardCharsets.UTF_8));
+ assertEquals("<僠WILDSTUFF僠>僠", new String(consumer.next().message(), StandardCharsets.UTF_8));
runner.shutdown();
}
@@ -241,13 +242,13 @@ public class TestPutKafka {
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "僠<僠WILDSTUFF僠>僠");
- runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDBOOMSTUFF僠>僠".getBytes());
+ runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDBOOMSTUFF僠>僠".getBytes(StandardCharsets.UTF_8));
runner.run(1, false);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
- assertEquals("Hello World", new String(consumer.next().message()));
- assertEquals("Goodbye僠<僠WILDBOOMSTUFF僠>僠", new String(consumer.next().message()));
+ assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8));
+ assertEquals("Goodbye僠<僠WILDBOOMSTUFF僠>僠", new String(consumer.next().message(), StandardCharsets.UTF_8));
runner.shutdown();
}
[2/2] nifi git commit: NIFI-1689 Correcting TimeUnit conversion to be
milliseconds instead of the incorrect microseconds and using historical
modified times to avoid issues with second granularity of HFS+
Posted by jo...@apache.org.
NIFI-1689 Correcting TimeUnit conversion to be milliseconds instead of the incorrect microseconds and using historical modified times to avoid issues with second granularity of HFS+
Signed-off-by: joewitt <jo...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d1c82ac6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d1c82ac6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d1c82ac6
Branch: refs/heads/support/nifi-0.6.x
Commit: d1c82ac622787ce897fae46d3407658a54705f50
Parents: ffa852d
Author: Aldrin Piri <al...@apache.org>
Authored: Mon Apr 4 16:01:16 2016 -0400
Committer: joewitt <jo...@apache.org>
Committed: Tue Apr 5 09:17:24 2016 -0400
----------------------------------------------------------------------
.../nifi/processors/standard/TestListFile.java | 31 ++++++++++++--------
1 file changed, 18 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1c82ac6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
index 5b6ea04..a3e5a94 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
@@ -266,7 +266,7 @@ public class TestListFile {
fos.write(bytes1000);
fos.close();
- final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
+ final long now = getTestModifiedTime();
assertTrue(file1.setLastModified(now));
assertTrue(file2.setLastModified(now));
assertTrue(file3.setLastModified(now));
@@ -342,7 +342,7 @@ public class TestListFile {
@Test
public void testFilterHidden() throws Exception {
- final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
+ final long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
FileOutputStream fos;
@@ -398,8 +398,7 @@ public class TestListFile {
@Test
public void testFilterFilePattern() throws Exception {
-
- final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
+ final long now = getTestModifiedTime();
final File file1 = new File(TESTDIR + "/file1-abc-apple.txt");
assertTrue(file1.createNewFile());
@@ -417,12 +416,6 @@ public class TestListFile {
assertTrue(file4.createNewFile());
assertTrue(file4.setLastModified(now));
- System.out.println(file1.lastModified());
- System.out.println(file2.lastModified());
- System.out.println(file3.lastModified());
- System.out.println(file4.lastModified());
-
-
// check all files
runner.clearTransferState();
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
@@ -453,7 +446,7 @@ public class TestListFile {
@Test
public void testFilterPathPattern() throws Exception {
- final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
+ final long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
final File subdir1 = new File(TESTDIR + "/subdir1");
assertTrue(subdir1.mkdirs());
@@ -522,7 +515,7 @@ public class TestListFile {
@Test
public void testRecurse() throws Exception {
- final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
+ final long now = getTestModifiedTime();
final File subdir1 = new File(TESTDIR + "/subdir1");
assertTrue(subdir1.mkdirs());
@@ -595,7 +588,7 @@ public class TestListFile {
@Test
public void testReadable() throws Exception {
- final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
+ final long now = getTestModifiedTime();
final File file1 = new File(TESTDIR + "/file1.txt");
assertTrue(file1.createNewFile());
@@ -701,6 +694,18 @@ public class TestListFile {
assertEquals(false, processor.isListingResetNecessary(new PropertyDescriptor.Builder().name("x").build()));
}
+ /*
+ * HFS+, default for OS X, only has granularity to one second, accordingly, we go back in time to establish consistent test cases
+ *
+ * Provides "now" minus 1 second in millis
+ */
+ private static long getTestModifiedTime() {
+ final long nowNanos = System.nanoTime();
+ // Subtract a second to avoid possible rounding issues
+ final long nowSeconds = TimeUnit.SECONDS.convert(nowNanos, TimeUnit.NANOSECONDS) - 1;
+ return TimeUnit.MILLISECONDS.convert(nowSeconds, TimeUnit.SECONDS);
+ }
+
public void resetAges() {
syncTime = System.currentTimeMillis();