You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2016/04/05 15:37:17 UTC

[1/2] nifi git commit: NIFI-1728 Specifying charsets for messages sent to/received from Kafka in Kafka processor and related tests to remedy failures in Windows environments. Specifying EOF as an int instead of a byte.

Repository: nifi
Updated Branches:
  refs/heads/support/nifi-0.6.x cd0d090e8 -> d1c82ac62


NIFI-1728 Specifying charsets for messages sent to/received from Kafka in Kafka processor and related tests to remedy failures in Windows environments. Specifying EOF as an int instead of a byte.

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ffa852de
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ffa852de
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ffa852de

Branch: refs/heads/support/nifi-0.6.x
Commit: ffa852de1319e4fb0c1341ab9edafe684249a67d
Parents: cd0d090
Author: Aldrin Piri <al...@apache.org>
Authored: Mon Apr 4 18:21:59 2016 -0400
Committer: joewitt <jo...@apache.org>
Committed: Tue Apr 5 09:17:03 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/kafka/StreamScanner.java    |  2 +-
 .../processors/kafka/KafkaPublisherTest.java    |  4 +-
 .../nifi/processors/kafka/TestPutKafka.java     | 51 ++++++++++----------
 3 files changed, 29 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ffa852de/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java
index ee83a02..57bbbcf 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java
@@ -26,7 +26,7 @@ import java.nio.ByteBuffer;
  */
 class StreamScanner {
 
-    private final static byte EOF = -1;
+    private final static int EOF = -1;
 
     private final InputStream is;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/ffa852de/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
index f21dfb0..2abb51a 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
@@ -179,7 +179,7 @@ public class KafkaPublisherTest {
     @Test
     public void validateWithMultiByteCharacters() throws Exception {
         String data = "僠THIS IS MY NEW TEXT.僠IT HAS A NEWLINE.";
-        InputStream fis = new ByteArrayInputStream(data.getBytes());
+        InputStream fis = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
         String topicName = "validateWithMultiByteCharacters";
 
         Properties kafkaProperties = this.buildProducerProperties();
@@ -192,7 +192,7 @@ public class KafkaPublisherTest {
         publisher.close();
 
         ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
-        String r = new String(iter.next().message());
+        String r = new String(iter.next().message(), StandardCharsets.UTF_8);
         assertEquals(data, r);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/ffa852de/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
index 34544df..5d75d54 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
@@ -77,18 +77,18 @@ public class TestPutKafka {
         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
 
-        runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes());
+        runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes(StandardCharsets.UTF_8));
         runner.run(1, false);
 
         runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
         ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
-        assertEquals("Hello World", new String(consumer.next().message()));
-        assertEquals("Goodbye", new String(consumer.next().message()));
-        assertEquals("1", new String(consumer.next().message()));
-        assertEquals("2", new String(consumer.next().message()));
-        assertEquals("3", new String(consumer.next().message()));
-        assertEquals("4", new String(consumer.next().message()));
-        assertEquals("5", new String(consumer.next().message()));
+        assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8));
+        assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8));
+        assertEquals("1", new String(consumer.next().message(), StandardCharsets.UTF_8));
+        assertEquals("2", new String(consumer.next().message(), StandardCharsets.UTF_8));
+        assertEquals("3", new String(consumer.next().message(), StandardCharsets.UTF_8));
+        assertEquals("4", new String(consumer.next().message(), StandardCharsets.UTF_8));
+        assertEquals("5", new String(consumer.next().message(), StandardCharsets.UTF_8));
 
         runner.shutdown();
     }
@@ -106,14 +106,14 @@ public class TestPutKafka {
         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
 
         final String text = "Hello World\nGoodbye\n1\n2";
-        runner.enqueue(text.getBytes());
+        runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
         afterClass(); // kill Kafka right before send to ensure producer fails
         runner.run(1, false);
 
         runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1);
         MockFlowFile ff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
         String failedSegmentsStr = ff.getAttribute(PutKafka.ATTR_FAILED_SEGMENTS);
-        BitSet fs = BitSet.valueOf(failedSegmentsStr.getBytes());
+        BitSet fs = BitSet.valueOf(failedSegmentsStr.getBytes(StandardCharsets.UTF_8));
         assertTrue(fs.get(0));
         assertTrue(fs.get(1));
         assertTrue(fs.get(2));
@@ -148,8 +148,8 @@ public class TestPutKafka {
 
         ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
 
-        assertEquals("Goodbye", new String(consumer.next().message()));
-        assertEquals("2", new String(consumer.next().message()));
+        assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8));
+        assertEquals("2", new String(consumer.next().message(), StandardCharsets.UTF_8));
         try {
             consumer.next();
             fail();
@@ -169,7 +169,7 @@ public class TestPutKafka {
         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
 
-        final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
+        final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(StandardCharsets.UTF_8);
         runner.enqueue(bytes);
         runner.run(1);
 
@@ -198,14 +198,14 @@ public class TestPutKafka {
         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "僠<僠WILDSTUFF僠>僠");
 
-        runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>".getBytes());
+        runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>".getBytes(StandardCharsets.UTF_8));
         runner.run(1, false);
 
         runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
         ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
-        assertEquals("Hello World", new String(consumer.next().message()));
-        assertEquals("Goodbye", new String(consumer.next().message()));
-        assertEquals("I Mean IT!僠<僠WILDSTUFF僠>", new String(consumer.next().message()));
+        assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8));
+        assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8));
+        assertEquals("I Mean IT!僠<僠WILDSTUFF僠>", new String(consumer.next().message(), StandardCharsets.UTF_8));
         runner.shutdown();
     }
 
@@ -219,15 +219,16 @@ public class TestPutKafka {
         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "僠<僠WILDSTUFF僠>僠");
 
-        runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>僠<僠WILDSTUFF僠>僠".getBytes());
+        runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>僠<僠WILDSTUFF僠>僠".getBytes(StandardCharsets.UTF_8));
         runner.run(1, false);
 
         runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
         ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
-        assertEquals("Hello World", new String(consumer.next().message()));
-        assertEquals("Goodbye", new String(consumer.next().message()));
-        assertEquals("I Mean IT!", new String(consumer.next().message()));
-        assertEquals("<僠WILDSTUFF僠>僠", new String(consumer.next().message()));
+        byte[] message = consumer.next().message();
+        assertEquals("Hello World", new String(message, StandardCharsets.UTF_8));
+        assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8));
+        assertEquals("I Mean IT!", new String(consumer.next().message(), StandardCharsets.UTF_8));
+        assertEquals("<僠WILDSTUFF僠>僠", new String(consumer.next().message(), StandardCharsets.UTF_8));
         runner.shutdown();
     }
 
@@ -241,13 +242,13 @@ public class TestPutKafka {
         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "僠<僠WILDSTUFF僠>僠");
 
-        runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDBOOMSTUFF僠>僠".getBytes());
+        runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDBOOMSTUFF僠>僠".getBytes(StandardCharsets.UTF_8));
         runner.run(1, false);
 
         runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
         ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
-        assertEquals("Hello World", new String(consumer.next().message()));
-        assertEquals("Goodbye僠<僠WILDBOOMSTUFF僠>僠", new String(consumer.next().message()));
+        assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8));
+        assertEquals("Goodbye僠<僠WILDBOOMSTUFF僠>僠", new String(consumer.next().message(), StandardCharsets.UTF_8));
         runner.shutdown();
     }
 


[2/2] nifi git commit: NIFI-1689 Correcting TimeUnit conversion to be milliseconds instead of the incorrect microseconds and using historical modified times to avoid issues with second granularity of HFS+

Posted by jo...@apache.org.
NIFI-1689 Correcting TimeUnit conversion to be milliseconds instead of the incorrect microseconds and using historical modified times to avoid issues with second granularity of HFS+

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d1c82ac6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d1c82ac6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d1c82ac6

Branch: refs/heads/support/nifi-0.6.x
Commit: d1c82ac622787ce897fae46d3407658a54705f50
Parents: ffa852d
Author: Aldrin Piri <al...@apache.org>
Authored: Mon Apr 4 16:01:16 2016 -0400
Committer: joewitt <jo...@apache.org>
Committed: Tue Apr 5 09:17:24 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/TestListFile.java  | 31 ++++++++++++--------
 1 file changed, 18 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d1c82ac6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
index 5b6ea04..a3e5a94 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
@@ -266,7 +266,7 @@ public class TestListFile {
         fos.write(bytes1000);
         fos.close();
 
-        final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
+        final long now = getTestModifiedTime();
         assertTrue(file1.setLastModified(now));
         assertTrue(file2.setLastModified(now));
         assertTrue(file3.setLastModified(now));
@@ -342,7 +342,7 @@ public class TestListFile {
 
     @Test
     public void testFilterHidden() throws Exception {
-        final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
+        final long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
 
         FileOutputStream fos;
 
@@ -398,8 +398,7 @@ public class TestListFile {
 
     @Test
     public void testFilterFilePattern() throws Exception {
-
-        final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
+        final long now = getTestModifiedTime();
 
         final File file1 = new File(TESTDIR + "/file1-abc-apple.txt");
         assertTrue(file1.createNewFile());
@@ -417,12 +416,6 @@ public class TestListFile {
         assertTrue(file4.createNewFile());
         assertTrue(file4.setLastModified(now));
 
-        System.out.println(file1.lastModified());
-        System.out.println(file2.lastModified());
-        System.out.println(file3.lastModified());
-        System.out.println(file4.lastModified());
-
-
         // check all files
         runner.clearTransferState();
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
@@ -453,7 +446,7 @@ public class TestListFile {
 
     @Test
     public void testFilterPathPattern() throws Exception {
-        final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
+        final long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
 
         final File subdir1 = new File(TESTDIR + "/subdir1");
         assertTrue(subdir1.mkdirs());
@@ -522,7 +515,7 @@ public class TestListFile {
 
     @Test
     public void testRecurse() throws Exception {
-        final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
+        final long now = getTestModifiedTime();
 
         final File subdir1 = new File(TESTDIR + "/subdir1");
         assertTrue(subdir1.mkdirs());
@@ -595,7 +588,7 @@ public class TestListFile {
 
     @Test
     public void testReadable() throws Exception {
-        final long now = TimeUnit.MICROSECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
+        final long now = getTestModifiedTime();
 
         final File file1 = new File(TESTDIR + "/file1.txt");
         assertTrue(file1.createNewFile());
@@ -701,6 +694,18 @@ public class TestListFile {
         assertEquals(false, processor.isListingResetNecessary(new PropertyDescriptor.Builder().name("x").build()));
     }
 
+    /*
+     * HFS+, default for OS X, only has granularity to one second, accordingly, we go back in time to establish consistent test cases
+     *
+     * Provides "now" minus 1 second in millis
+    */
+    private static long getTestModifiedTime() {
+        final long nowNanos = System.nanoTime();
+        // Subtract a second to avoid possible rounding issues
+        final long nowSeconds = TimeUnit.SECONDS.convert(nowNanos, TimeUnit.NANOSECONDS) - 1;
+        return TimeUnit.MILLISECONDS.convert(nowSeconds, TimeUnit.SECONDS);
+    }
+
     public void resetAges() {
         syncTime = System.currentTimeMillis();