You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/01/19 20:18:27 UTC
[1/2] nifi git commit: NIFI-3216: Add N signals to Wait/Notify
Repository: nifi
Updated Branches:
refs/heads/master e62eeb756 -> 7f0171ffa
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java
index 37c4c43..e5c183f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java
@@ -16,26 +16,29 @@
*/
package org.apache.nifi.processors.standard;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
-import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
+import org.apache.nifi.distributed.cache.client.StandardCacheEntry;
+import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
public class TestNotify {
private TestRunner runner;
@@ -66,8 +69,47 @@ public class TestNotify {
runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1);
runner.clearTransferState();
- Map<String, String> cachedAttributes = service.get("1", new Notify.StringSerializer(), new FlowFileAttributesSerializer());
+ final Signal signal = new WaitNotifyProtocol(service).getSignal("1");
+ Map<String, String> cachedAttributes = signal.getAttributes();
assertEquals("value", cachedAttributes.get("key"));
+ assertTrue(signal.isTotalCountReached(1));
+ }
+
+ @Test
+ public void testNotifyCounters() throws InitializationException, IOException {
+ runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
+ runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*");
+ runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}");
+
+ final Map<String, String> props1 = new HashMap<>();
+ props1.put("releaseSignalAttribute", "someDataProcessing");
+ props1.put("key", "data1");
+ props1.put("status", "success");
+ runner.enqueue(new byte[]{}, props1);
+
+ final Map<String, String> props2 = new HashMap<>();
+ props2.put("releaseSignalAttribute", "someDataProcessing");
+ props2.put("key", "data2");
+ props2.put("status", "success");
+ runner.enqueue(new byte[]{}, props2);
+
+ final Map<String, String> props3 = new HashMap<>();
+ props3.put("releaseSignalAttribute", "someDataProcessing");
+ props3.put("key", "data3");
+ props3.put("status", "failure");
+ runner.enqueue(new byte[]{}, props3);
+
+ runner.run(3);
+
+ runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 3);
+ runner.clearTransferState();
+
+ final Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing");
+ Map<String, String> cachedAttributes = signal.getAttributes();
+ assertEquals("Same attribute key will be overwritten by the latest signal", "data3", cachedAttributes.get("key"));
+ assertTrue(signal.isTotalCountReached(3));
+ assertEquals(2, signal.getCount("success"));
+ assertEquals(1, signal.getCount("failure"));
}
@Test
@@ -86,9 +128,11 @@ public class TestNotify {
runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1);
runner.clearTransferState();
- Map<String, String> cachedAttributes = service.get("1", new Notify.StringSerializer(), new FlowFileAttributesSerializer());
+ final Signal signal = new WaitNotifyProtocol(service).getSignal("1");
+ Map<String, String> cachedAttributes = signal.getAttributes();
assertEquals("value", cachedAttributes.get("key1"));
assertNull(cachedAttributes.get("other.key1"));
+ assertTrue(signal.isTotalCountReached(1));
}
@Test
@@ -121,11 +165,11 @@ public class TestNotify {
service.setFailOnCalls(false);
}
- private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient {
- private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>();
+ static class MockCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient {
+ private final ConcurrentMap<Object, CacheEntry> values = new ConcurrentHashMap<>();
private boolean failOnCalls = false;
- public void setFailOnCalls(boolean failOnCalls){
+ void setFailOnCalls(boolean failOnCalls){
this.failOnCalls = failOnCalls;
}
@@ -136,42 +180,47 @@ public class TestNotify {
}
}
+ private void unsupported() throws UnsupportedOperationException {
+ throw new UnsupportedOperationException("This method shouldn't be used from Notify processor.");
+ }
+
@Override
public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
- verifyNotFail();
- final Object retValue = values.putIfAbsent(key, value);
- return (retValue == null);
+ unsupported();
+ return false;
}
@Override
@SuppressWarnings("unchecked")
public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
final Deserializer<V> valueDeserializer) throws IOException {
- verifyNotFail();
- return (V) values.putIfAbsent(key, value);
+ unsupported();
+ return null;
}
@Override
public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
- verifyNotFail();
- return values.containsKey(key);
+ unsupported();
+ return false;
}
@Override
public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
- verifyNotFail();
- values.put(key, value);
+ unsupported();
}
@Override
@SuppressWarnings("unchecked")
public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
verifyNotFail();
- if(values.containsKey(key)) {
- return (V) values.get(key);
- } else {
+
+ final CacheEntry entry = values.get(key);
+ if (entry == null) {
return null;
}
+
+ // This mock cache stores String as it is, without serializing, so it needs to convert it to byte[] first here.
+ return valueDeserializer.deserialize(((String)entry.getValue()).getBytes(StandardCharsets.UTF_8));
}
@Override
@@ -181,7 +230,28 @@ public class TestNotify {
@Override
public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
verifyNotFail();
- values.remove(key);
+ return values.remove(key) != null;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <K, V> CacheEntry<K, V> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+ verifyNotFail();
+
+ return values.get(key);
+ }
+
+ @Override
+ public <K, V> boolean replace(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, long revision) throws IOException {
+ verifyNotFail();
+
+ final CacheEntry existing = values.get(key);
+ if (existing != null && existing.getRevision() != revision) {
+ return false;
+ }
+
+ values.put(key, new StandardCacheEntry<>(key, value, revision + 1));
+
return true;
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSegmentContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSegmentContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSegmentContent.java
index 5a88323..74fad06 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSegmentContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSegmentContent.java
@@ -37,6 +37,11 @@ public class TestSegmentContent {
testRunner.enqueue(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9});
testRunner.run();
+ testRunner.assertTransferCount(SegmentContent.REL_ORIGINAL, 1);
+ final MockFlowFile originalFlowFile = testRunner.getFlowFilesForRelationship(SegmentContent.REL_ORIGINAL).get(0);
+ originalFlowFile.assertAttributeExists(SegmentContent.FRAGMENT_ID);
+ originalFlowFile.assertAttributeEquals(SegmentContent.FRAGMENT_COUNT, "3");
+
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(SegmentContent.REL_SEGMENTS);
assertEquals(3, flowFiles.size());
@@ -57,6 +62,11 @@ public class TestSegmentContent {
testRunner.enqueue(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9});
testRunner.run();
+ testRunner.assertTransferCount(SegmentContent.REL_ORIGINAL, 1);
+ final MockFlowFile originalFlowFile = testRunner.getFlowFilesForRelationship(SegmentContent.REL_ORIGINAL).get(0);
+ originalFlowFile.assertAttributeExists(SegmentContent.FRAGMENT_ID);
+ originalFlowFile.assertAttributeEquals(SegmentContent.FRAGMENT_COUNT, "1");
+
testRunner.assertTransferCount(SegmentContent.REL_SEGMENTS, 1);
final MockFlowFile out1 = testRunner.getFlowFilesForRelationship(SegmentContent.REL_SEGMENTS).get(0);
out1.assertContentEquals(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9});
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java
index 6d9fba9..0dec5a1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java
@@ -25,6 +25,9 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Test;
+import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_COUNT;
+import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_ID;
+
public class TestSplitContent {
@Test
@@ -39,6 +42,7 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "4");
runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
runner.assertQueueEmpty();
@@ -62,6 +66,7 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "4");
runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
runner.assertQueueEmpty();
@@ -76,6 +81,7 @@ public class TestSplitContent {
runner.enqueue(input);
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "4");
runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
splits.get(0).assertContentEquals("This is a ");
@@ -89,6 +95,7 @@ public class TestSplitContent {
runner.enqueue(input);
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "4");
runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
splits.get(0).assertContentEquals("This is a test");
@@ -102,6 +109,7 @@ public class TestSplitContent {
runner.enqueue("This is a test. This is another test. And this is yet another test. Finally this is the last test".getBytes());
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "4");
runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
splits.get(0).assertContentEquals("This is a test");
@@ -115,6 +123,7 @@ public class TestSplitContent {
runner.enqueue("This is a test. This is another test. And this is yet another test. Finally this is the last test".getBytes());
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "5");
runner.assertTransferCount(SplitContent.REL_SPLITS, 5);
splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
splits.get(0).assertContentEquals("This is a ");
@@ -138,6 +147,7 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "3");
runner.assertTransferCount(SplitContent.REL_SPLITS, 3);
runner.assertQueueEmpty();
@@ -157,6 +167,7 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
runner.assertTransferCount(SplitContent.REL_SPLITS, 2);
runner.assertQueueEmpty();
@@ -178,6 +189,7 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
runner.assertTransferCount(SplitContent.REL_SPLITS, 2);
runner.assertQueueEmpty();
@@ -202,6 +214,9 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+ final MockFlowFile originalFlowFile = runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0);
+ originalFlowFile.assertAttributeExists(FRAGMENT_ID);
+ originalFlowFile.assertAttributeEquals(FRAGMENT_COUNT, "2");
runner.assertTransferCount(SplitContent.REL_SPLITS, 2);
runner.assertQueueEmpty();
@@ -227,6 +242,7 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
runner.assertTransferCount(SplitContent.REL_SPLITS, 2);
runner.assertQueueEmpty();
@@ -249,6 +265,7 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1");
runner.assertTransferCount(SplitContent.REL_SPLITS, 1);
runner.assertQueueEmpty();
@@ -269,6 +286,7 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1");
runner.assertTransferCount(SplitContent.REL_SPLITS, 1);
runner.assertQueueEmpty();
@@ -289,6 +307,7 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1");
runner.assertTransferCount(SplitContent.REL_SPLITS, 1);
runner.assertQueueEmpty();
@@ -309,6 +328,7 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
runner.assertTransferCount(SplitContent.REL_SPLITS, 2);
runner.assertQueueEmpty();
@@ -327,6 +347,7 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
runner.assertTransferCount(SplitContent.REL_SPLITS, 2);
runner.assertQueueEmpty();
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java
index 4e0d999..4efd82d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java
@@ -35,6 +35,11 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME;
+
public class TestSplitJson {
private static final Path JSON_SNIPPET = Paths.get("src/test/resources/TestJson/json-sample.json");
@@ -86,6 +91,7 @@ public class TestSplitJson {
testRunner.run();
testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1);
+ testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
testRunner.assertTransferCount(SplitJson.REL_SPLIT, 1);
testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertContentEquals(JSON_SNIPPET);
testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(0).assertContentEquals("0");
@@ -102,6 +108,7 @@ public class TestSplitJson {
int numSplitsExpected = 10;
testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1);
+ testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), String.valueOf(numSplitsExpected));
testRunner.assertTransferCount(SplitJson.REL_SPLIT, numSplitsExpected);
final MockFlowFile originalOut = testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0);
originalOut.assertContentEquals(JSON_SNIPPET);
@@ -120,18 +127,21 @@ public class TestSplitJson {
testRunner.run();
testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1);
+ final MockFlowFile originalFlowFile = testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0);
+ originalFlowFile.assertAttributeExists(FRAGMENT_ID.key());
+ originalFlowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "7");
+ originalFlowFile.assertContentEquals(JSON_SNIPPET);
testRunner.assertTransferCount(SplitJson.REL_SPLIT, 7);
- testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertContentEquals(JSON_SNIPPET);
MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(0);
flowFile.assertContentEquals("{\"first\":\"Shaffer\",\"last\":\"Pearson\"}");
- flowFile.assertAttributeEquals("fragment.count", "7");
- flowFile.assertAttributeEquals("fragment.index", "0");
- flowFile.assertAttributeEquals("segment.original.filename", "test.json");
+ flowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "7");
+ flowFile.assertAttributeEquals(FRAGMENT_INDEX.key(), "0");
+ flowFile.assertAttributeEquals(SEGMENT_ORIGINAL_FILENAME.key(), "test.json");
flowFile = testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(6);
- flowFile.assertAttributeEquals("fragment.count", "7");
- flowFile.assertAttributeEquals("fragment.index", "6");
- flowFile.assertAttributeEquals("segment.original.filename", "test.json");
+ flowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "7");
+ flowFile.assertAttributeEquals(FRAGMENT_INDEX.key(), "6");
+ flowFile.assertAttributeEquals(SEGMENT_ORIGINAL_FILENAME.key(), "test.json");
}
@Test
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java
index 3951e02..8e4c881 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java
@@ -50,6 +50,9 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+ final MockFlowFile originalFlowFile = runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0);
+ originalFlowFile.assertAttributeEquals(SplitText.FRAGMENT_COUNT, "3");
+ originalFlowFile.assertAttributeExists(SplitText.FRAGMENT_ID);
runner.assertTransferCount(SplitText.REL_SPLITS, 3);
}
@@ -80,6 +83,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals("fragment.count", "2");
runner.assertTransferCount(SplitText.REL_SPLITS, 2);
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
@@ -87,7 +91,7 @@ public class TestSplitText {
splits.get(0).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "86");
splits.get(1).assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "3");
splits.get(1).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "54");
- final String fragmentUUID = splits.get(0).getAttribute("fragment.identifier");
+ final String fragmentUUID = splits.get(0).getAttribute(SplitText.FRAGMENT_ID);
for (int i = 0; i < splits.size(); i++) {
final MockFlowFile split = splits.get(i);
split.assertAttributeEquals(SplitText.FRAGMENT_INDEX, String.valueOf(i+1));
@@ -110,6 +114,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals("fragment.count", "3");
runner.assertTransferCount(SplitText.REL_SPLITS, 3);
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
@@ -119,7 +124,7 @@ public class TestSplitText {
splits.get(1).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "55");
splits.get(2).assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "1");
splits.get(2).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "23");
- final String fragmentUUID = splits.get(0).getAttribute("fragment.identifier");
+ final String fragmentUUID = splits.get(0).getAttribute(SplitText.FRAGMENT_ID);
for (int i = 0; i < splits.size(); i++) {
final MockFlowFile split = splits.get(i);
split.assertAttributeEquals(SplitText.FRAGMENT_INDEX, String.valueOf(i + 1));
@@ -152,6 +157,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals("fragment.count", "6");
runner.assertTransferCount(SplitText.REL_SPLITS, 6);
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
@@ -176,6 +182,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals("fragment.count", "1");
runner.assertTransferCount(SplitText.REL_SPLITS, 1);
// repeat with header cou8nt versus header marker
@@ -189,6 +196,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals("fragment.count", "1");
runner.assertTransferCount(SplitText.REL_SPLITS, 1);
// repeat single header line with no newline characters
@@ -202,6 +210,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals("fragment.count", "1");
runner.assertTransferCount(SplitText.REL_SPLITS, 1);
}
@@ -218,10 +227,11 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2");
runner.assertTransferCount(SplitText.REL_SPLITS, 2);
List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
- String fragmentUUID = splits.get(0).getAttribute("fragment.identifier");
+ String fragmentUUID = splits.get(0).getAttribute(SplitText.FRAGMENT_ID);
for (int i = 0; i < splits.size(); i++) {
final MockFlowFile split = splits.get(i);
split.assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "5");
@@ -244,10 +254,11 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2");
runner.assertTransferCount(SplitText.REL_SPLITS, 2);
splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
- fragmentUUID = splits.get(0).getAttribute("fragment.identifier");
+ fragmentUUID = splits.get(0).getAttribute(SplitText.FRAGMENT_ID);
splits.get(0).assertContentEquals("Header Line #1\nHeader Line #2\nLine #1\nLine #2\nLine #3\nLine #4\nLine #5\n");
splits.get(1).assertContentEquals("Line #6\nLine #7\nLine #8\nLine #9\nLine #10");
splits.get(0).assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "7");
@@ -275,6 +286,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "4");
runner.assertTransferCount(SplitText.REL_SPLITS, 4);
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
@@ -292,6 +304,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2");
runner.assertTransferCount(SplitText.REL_SPLITS, 2);
final List<MockFlowFile> splitsWithNoHeader = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
@@ -311,6 +324,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "4");
runner.assertTransferCount(SplitText.REL_SPLITS, 4);
final List<MockFlowFile> splits =runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
@@ -328,6 +342,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2");
runner.assertTransferCount(SplitText.REL_SPLITS, 2);
final List<MockFlowFile> splitsWithNoHeader =runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
@@ -394,6 +409,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "4");
runner.assertTransferCount(SplitText.REL_SPLITS, 4);
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
@@ -420,6 +436,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "11");
runner.assertTransferCount(SplitText.REL_SPLITS, 11);
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitXml.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitXml.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitXml.java
index 2157ab8..281212f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitXml.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitXml.java
@@ -37,6 +37,11 @@ import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
import org.xml.sax.helpers.DefaultHandler;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME;
+
public class TestSplitXml {
SAXParserFactory factory;
@@ -67,15 +72,18 @@ public class TestSplitXml {
});
runner.run();
runner.assertTransferCount(SplitXml.REL_ORIGINAL, 1);
+ final MockFlowFile originalFlowFile = runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL).get(0);
+ originalFlowFile.assertAttributeExists(FRAGMENT_ID.key());
+ originalFlowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "6");
runner.assertTransferCount(SplitXml.REL_SPLIT, 6);
parseFlowFiles(runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL));
parseFlowFiles(runner.getFlowFilesForRelationship(SplitXml.REL_SPLIT));
Arrays.asList(0, 1, 2, 3, 4, 5).forEach((index) -> {
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(SplitXml.REL_SPLIT).get(index);
- flowFile.assertAttributeEquals("fragment.index", Integer.toString(index));
- flowFile.assertAttributeEquals("fragment.count", "6");
- flowFile.assertAttributeEquals("segment.original.filename", "test.xml");
+ flowFile.assertAttributeEquals(FRAGMENT_INDEX.key(), Integer.toString(index));
+ flowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "6");
+ flowFile.assertAttributeEquals(SEGMENT_ORIGINAL_FILENAME.key(), "test.xml");
});
}
@@ -86,6 +94,7 @@ public class TestSplitXml {
runner.enqueue(Paths.get("src/test/resources/TestXml/xml-bundle-1"));
runner.run();
runner.assertTransferCount(SplitXml.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "12");
runner.assertTransferCount(SplitXml.REL_SPLIT, 12);
parseFlowFiles(runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL));
@@ -99,6 +108,7 @@ public class TestSplitXml {
runner.enqueue(Paths.get("src/test/resources/TestXml/xml-bundle-1"));
runner.run();
runner.assertTransferCount(SplitXml.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "12");
runner.assertTransferCount(SplitXml.REL_SPLIT, 12);
parseFlowFiles(runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL));
@@ -118,6 +128,7 @@ public class TestSplitXml {
runner.enqueue(Paths.get("src/test/resources/TestXml/namespace.xml"));
runner.run();
runner.assertTransferCount(SplitXml.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "2");
runner.assertTransferCount(SplitXml.REL_SPLIT, 2);
parseFlowFiles(runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL));
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
index e91a6e6..587d050 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.processors.standard;
+import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_COUNT;
+import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_ID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -57,10 +59,14 @@ public class TestUnpackContent {
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
+ unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
+ unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2");
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
+ autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
+ autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2");
autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
final List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
@@ -95,10 +101,14 @@ public class TestUnpackContent {
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
+ unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1");
+ unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "1");
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
+ autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1");
+ autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "1");
autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
@@ -138,10 +148,14 @@ public class TestUnpackContent {
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
+ unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
+ unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2");
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
+ autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
+ autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2");
autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
final List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
@@ -174,10 +188,14 @@ public class TestUnpackContent {
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
+ unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1");
+ unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "1");
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
+ autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1");
+ autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "1");
autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
@@ -211,6 +229,8 @@ public class TestUnpackContent {
runner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
runner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
+ runner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
+ runner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2");
runner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
final List<MockFlowFile> unpacked = runner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
@@ -235,6 +255,8 @@ public class TestUnpackContent {
runner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
runner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
+ runner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
+ runner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2");
runner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
final List<MockFlowFile> unpacked = runner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
@@ -258,6 +280,7 @@ public class TestUnpackContent {
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1);
+ unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
final List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
@@ -292,6 +315,9 @@ public class TestUnpackContent {
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1);
+ final MockFlowFile originalFlowFile = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0);
+ originalFlowFile.assertAttributeExists(FRAGMENT_ID);
+ originalFlowFile.assertAttributeEquals(FRAGMENT_COUNT, "2");
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
final List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
index f42740b..e1117d5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
@@ -24,14 +24,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.distributed.cache.client.Deserializer;
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
-import org.apache.nifi.distributed.cache.client.Serializer;
-import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
+import org.apache.nifi.processors.standard.TestNotify.MockCacheClient;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -60,7 +53,7 @@ public class TestWait {
final Map<String, String> props = new HashMap<>();
props.put("releaseSignalAttribute", "1");
- runner.enqueue(new byte[] {},props);
+ runner.enqueue(new byte[]{}, props);
runner.run();
@@ -76,7 +69,7 @@ public class TestWait {
final Map<String, String> props = new HashMap<>();
props.put("releaseSignalAttribute", "1");
- runner.enqueue(new byte[] {},props);
+ runner.enqueue(new byte[]{}, props);
runner.run();
@@ -94,6 +87,45 @@ public class TestWait {
}
@Test
+ public void testCounterExpired() throws InitializationException, InterruptedException, IOException {
+ runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
+ runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "5");
+ runner.setProperty(Wait.EXPIRATION_DURATION, "100 ms");
+
+ final Map<String, String> props = new HashMap<>();
+ props.put("releaseSignalAttribute", "notification-id");
+ runner.enqueue(new byte[]{}, props);
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
+ MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+
+ runner.clearTransferState();
+ runner.enqueue(ff);
+
+ final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service);
+ final Map<String, String> signalAttributes = new HashMap<>();
+ signalAttributes.put("signal-attr-1", "signal-attr-1-value");
+ signalAttributes.put("signal-attr-2", "signal-attr-2-value");
+ protocol.notify("notification-id", "counter-A", 1, signalAttributes);
+ protocol.notify("notification-id", "counter-B", 2, signalAttributes);
+
+ Thread.sleep(101L);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(Wait.REL_EXPIRED, 1);
+ ff = runner.getFlowFilesForRelationship(Wait.REL_EXPIRED).get(0);
+ // Even if wait didn't complete, signal attributes should be set
+ ff.assertAttributeEquals("wait.counter.total", "3");
+ ff.assertAttributeEquals("wait.counter.counter-A", "1");
+ ff.assertAttributeEquals("wait.counter.counter-B", "2");
+ ff.assertAttributeEquals("signal-attr-1", "signal-attr-1-value");
+ ff.assertAttributeEquals("signal-attr-2", "signal-attr-2-value");
+ runner.clearTransferState();
+ }
+
+ @Test
public void testBadWaitStartTimestamp() throws InitializationException, InterruptedException {
runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Wait.EXPIRATION_DURATION, "100 ms");
@@ -101,7 +133,7 @@ public class TestWait {
final Map<String, String> props = new HashMap<>();
props.put("releaseSignalAttribute", "1");
props.put("wait.start.timestamp", "blue bunny");
- runner.enqueue(new byte[] {},props);
+ runner.enqueue(new byte[]{}, props);
runner.run();
@@ -114,11 +146,12 @@ public class TestWait {
runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
final Map<String, String> props = new HashMap<>();
- runner.enqueue(new byte[] {},props);
+ runner.enqueue(new byte[]{}, props);
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
+ runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists("wait.counter.total");
runner.clearTransferState();
}
@@ -129,12 +162,13 @@ public class TestWait {
final Map<String, String> props = new HashMap<>();
props.put("releaseSignalAttribute", "2");
- runner.enqueue(new byte[] {}, props);
+ runner.enqueue(new byte[]{}, props);
runner.run();
//Expect the processor to receive an IO exception from the cache service and route to failure
runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
runner.assertTransferCount(Wait.REL_FAILURE, 1);
+ runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists("wait.counter.total");
service.setFailOnCalls(false);
}
@@ -146,7 +180,10 @@ public class TestWait {
cachedAttributes.put("uuid", "notifyUuid");
cachedAttributes.put("notify.only", "notifyValue");
- service.put("key", cachedAttributes, new Wait.StringSerializer(), new FlowFileAttributesSerializer());
+ // Setup existing cache entry.
+ final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service);
+ protocol.notify("key", "default", 1, cachedAttributes);
+
runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Wait.ATTRIBUTE_COPY_MODE, Wait.ATTRIBUTE_COPY_REPLACE.getValue());
@@ -159,7 +196,7 @@ public class TestWait {
runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes);
// make sure the key is in the cache before Wait runs
- assertNotNull(service.get("key", new Wait.StringSerializer(), new FlowFileAttributesSerializer()));
+ assertNotNull(protocol.getSignal("key"));
runner.run();
@@ -180,7 +217,7 @@ public class TestWait {
runner.clearTransferState();
// make sure Wait removed this key from the cache
- assertNull(service.get("key", new Wait.StringSerializer(), new FlowFileAttributesSerializer()));
+ assertNull(protocol.getSignal("key"));
}
@Test
@@ -190,7 +227,10 @@ public class TestWait {
cachedAttributes.put("uuid", "notifyUuid");
cachedAttributes.put("notify.only", "notifyValue");
- service.put("key", cachedAttributes, new Wait.StringSerializer(), new FlowFileAttributesSerializer());
+ // Setup existing cache entry.
+ final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service);
+ protocol.notify("key", "default", 1, cachedAttributes);
+
runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Wait.ATTRIBUTE_COPY_MODE, Wait.ATTRIBUTE_COPY_KEEP_ORIGINAL.getValue());
@@ -221,70 +261,181 @@ public class TestWait {
runner.clearTransferState();
}
- private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient {
- private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>();
- private boolean failOnCalls = false;
-
- public void setFailOnCalls(boolean failOnCalls){
- this.failOnCalls = failOnCalls;
- }
-
-
- private void verifyNotFail() throws IOException {
- if (failOnCalls) {
- throw new IOException("Could not call to remote service because Unit Test marked service unavailable");
- }
- }
-
- @Override
- public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
- verifyNotFail();
- final Object retValue = values.putIfAbsent(key, value);
- return (retValue == null);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
- final Deserializer<V> valueDeserializer) throws IOException {
- verifyNotFail();
- return (V) values.putIfAbsent(key, value);
- }
-
- @Override
- public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
- verifyNotFail();
- return values.containsKey(key);
- }
-
- @Override
- public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
- verifyNotFail();
- values.put(key, value);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
- verifyNotFail();
- if(values.containsKey(key)) {
- return (V) values.get(key);
- } else {
- return null;
- }
- }
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
- verifyNotFail();
- values.remove(key);
- return true;
- }
+ @Test
+ public void testWaitForTotalCount() throws InitializationException, IOException {
+ Map<String, String> cachedAttributes = new HashMap<>();
+ cachedAttributes.put("both", "notifyValue");
+ cachedAttributes.put("uuid", "notifyUuid");
+ cachedAttributes.put("notify.only", "notifyValue");
+
+ // Setup existing cache entry.
+ final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service);
+ protocol.notify("key", "counter-A", 1, cachedAttributes);
+
+ runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
+ runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}");
+
+ final Map<String, String> waitAttributes = new HashMap<>();
+ waitAttributes.put("releaseSignalAttribute", "key");
+ waitAttributes.put("targetSignalCount", "3");
+ waitAttributes.put("wait.only", "waitValue");
+ waitAttributes.put("both", "waitValue");
+ waitAttributes.put("uuid", UUID.randomUUID().toString());
+ String flowFileContent = "content";
+ runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes);
+
+ /*
+ * 1st iteration
+ */
+ runner.run();
+ runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
+ MockFlowFile waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+
+ /*
+ * 2nd iteration.
+ */
+ runner.clearTransferState();
+ runner.enqueue(waitingFlowFile);
+
+ // Notify with other counter.
+ protocol.notify("key", "counter-B", 1, cachedAttributes);
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
+ // Still waiting since total count doesn't reach to 3.
+ waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+
+ /*
+ * 3rd iteration.
+ */
+ runner.clearTransferState();
+ runner.enqueue(waitingFlowFile);
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
+ // Still waiting since total count doesn't reach to 3.
+ waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+
+ /*
+ * 4th iteration.
+ */
+ runner.clearTransferState();
+ runner.enqueue(waitingFlowFile);
+
+ // Notify with other counter.
+ protocol.notify("key", "counter-C", 1, cachedAttributes);
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
+
+ final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+
+ // show a new attribute was copied from the cache
+ assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only"));
+ // show that uuid was not overwritten
+ assertEquals(waitAttributes.get("uuid"), outputFlowFile.getAttribute("uuid"));
+ // show that the original attributes are still there
+ assertEquals("waitValue", outputFlowFile.getAttribute("wait.only"));
+ // show that the original attribute is kept
+ assertEquals("waitValue", outputFlowFile.getAttribute("both"));
+ runner.clearTransferState();
+
+ assertNull("The key no longer exist", protocol.getSignal("key"));
}
+ @Test
+ public void testWaitForSpecificCount() throws InitializationException, IOException {
+ Map<String, String> cachedAttributes = new HashMap<>();
+ cachedAttributes.put("both", "notifyValue");
+ cachedAttributes.put("uuid", "notifyUuid");
+ cachedAttributes.put("notify.only", "notifyValue");
+
+ // Setup existing cache entry.
+ final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service);
+ protocol.notify("key", "counter-A", 1, cachedAttributes);
+
+ runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
+ runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}");
+ runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
+
+ final Map<String, String> waitAttributes = new HashMap<>();
+ waitAttributes.put("releaseSignalAttribute", "key");
+ waitAttributes.put("targetSignalCount", "2");
+ waitAttributes.put("signalCounterName", "counter-B");
+ waitAttributes.put("wait.only", "waitValue");
+ waitAttributes.put("both", "waitValue");
+ waitAttributes.put("uuid", UUID.randomUUID().toString());
+ String flowFileContent = "content";
+ runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes);
+
+ /*
+ * 1st iteration
+ */
+ runner.run();
+ runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
+ MockFlowFile waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+
+ /*
+ * 2nd iteration.
+ */
+ runner.clearTransferState();
+ runner.enqueue(waitingFlowFile);
+
+ // Notify with target counter.
+ protocol.notify("key", "counter-B", 1, cachedAttributes);
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
+ // Still waiting since counter-B doesn't reach to 2.
+ waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+
+ /*
+ * 3rd iteration.
+ */
+ runner.clearTransferState();
+ runner.enqueue(waitingFlowFile);
+
+ // Notify with other counter.
+ protocol.notify("key", "counter-C", 1, cachedAttributes);
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
+ // Still waiting since total count doesn't reach to 3.
+ waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+
+ /*
+ * 4th iteration.
+ */
+ runner.clearTransferState();
+ runner.enqueue(waitingFlowFile);
+
+ // Notify with target counter.
+ protocol.notify("key", "counter-B", 1, cachedAttributes);
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
+
+ final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+
+ // show a new attribute was copied from the cache
+ assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only"));
+ // show that uuid was not overwritten
+ assertEquals(waitAttributes.get("uuid"), outputFlowFile.getAttribute("uuid"));
+ // show that the original attributes are still there
+ assertEquals("waitValue", outputFlowFile.getAttribute("wait.only"));
+ // show that the original attribute is kept
+ assertEquals("waitValue", outputFlowFile.getAttribute("both"));
+
+ outputFlowFile.assertAttributeEquals("wait.counter.total", "4");
+ outputFlowFile.assertAttributeEquals("wait.counter.counter-A", "1");
+ outputFlowFile.assertAttributeEquals("wait.counter.counter-B", "2");
+ outputFlowFile.assertAttributeEquals("wait.counter.counter-C", "1");
+
+ runner.clearTransferState();
+
+ assertNull("The key no longer exist", protocol.getSignal("key"));
+
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java
new file mode 100644
index 0000000..d4bc783
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient.CacheEntry;
+import org.apache.nifi.distributed.cache.client.StandardCacheEntry;
+import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal;
+import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+public class TestWaitNotifyProtocol {
+
+ private final Map<String, CacheEntry<String, String>> cacheEntries = new HashMap<>();
+
+ private AtomicDistributedMapCacheClient cache;
+ private final Answer successfulReplace = invocation -> {
+ final String key = invocation.getArgumentAt(0, String.class);
+ final String value = invocation.getArgumentAt(1, String.class);
+ final Long revision = invocation.getArgumentAt(4, Long.class);
+ cacheEntries.put(key, new StandardCacheEntry<>(key, value, revision + 1));
+ return true;
+ };
+
+ @Before
+ public void before() throws Exception {
+ cacheEntries.clear();
+
+ // Default mock implementations.
+ cache = mock(AtomicDistributedMapCacheClient.class);
+ doAnswer(invocation -> {
+ final CacheEntry<String, String> entry = cacheEntries.get(invocation.getArguments()[0]);
+ return entry;
+ }).when(cache).fetch(any(), any(), any());
+ }
+
+ @Test
+ public void testNotifyRetryFailure() throws Exception {
+
+ // replace always return false.
+ doAnswer(invocation -> false)
+ .when(cache).replace(any(), any(), any(), any(), anyLong());
+
+ final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
+
+ final String signalId = "signal-id";
+ try {
+ protocol.notify(signalId, "a", 1, null);
+ fail("Notify should fail after retrying few times.");
+ } catch (ConcurrentModificationException e) {
+ }
+ }
+
+ @Test
+ public void testNotifyFirst() throws Exception {
+
+ doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong());
+
+ final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
+
+ final String signalId = "signal-id";
+ final Signal signal = protocol.notify(signalId, "a", 1, null);
+
+ assertNotNull(signal);
+ assertEquals(Long.valueOf(1), signal.getCounts().get("a"));
+ assertTrue(cacheEntries.containsKey("signal-id"));
+
+ final CacheEntry<String, String> cacheEntry = cacheEntries.get("signal-id");
+
+ assertEquals(0, cacheEntry.getRevision());
+ assertEquals("{\"counts\":{\"a\":1},\"attributes\":{}}", cacheEntry.getValue());
+ }
+
+ @Test
+ public void testNotifyCounters() throws Exception {
+
+ doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong());
+
+ final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
+
+ final String signalId = "signal-id";
+
+ protocol.notify(signalId, "a", 1, null);
+ protocol.notify(signalId, "a", 1, null);
+
+ CacheEntry<String, String> cacheEntry = cacheEntries.get("signal-id");
+ assertEquals(1, cacheEntry.getRevision());
+ assertEquals("{\"counts\":{\"a\":2},\"attributes\":{}}", cacheEntry.getValue());
+
+ protocol.notify(signalId, "a", 10, null);
+
+ cacheEntry = cacheEntries.get("signal-id");
+ assertEquals(2, cacheEntry.getRevision());
+ assertEquals("{\"counts\":{\"a\":12},\"attributes\":{}}", cacheEntry.getValue());
+
+ protocol.notify(signalId, "b", 2, null);
+ protocol.notify(signalId, "c", 3, null);
+
+ cacheEntry = cacheEntries.get("signal-id");
+ assertEquals(4, cacheEntry.getRevision());
+ assertEquals("{\"counts\":{\"a\":12,\"b\":2,\"c\":3},\"attributes\":{}}", cacheEntry.getValue());
+
+ }
+
+ @Test
+ public void testNotifyAttributes() throws Exception {
+ doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong());
+
+ final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
+
+ final String signalId = "signal-id";
+
+ final Map<String, String> attributeA1 = new HashMap<>();
+ attributeA1.put("p1", "a1");
+ attributeA1.put("p2", "a1");
+
+ protocol.notify(signalId, "a", 1, attributeA1);
+
+ CacheEntry<String, String> cacheEntry = cacheEntries.get("signal-id");
+ assertEquals(0, cacheEntry.getRevision());
+ assertEquals("{\"counts\":{\"a\":1},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a1\"}}", cacheEntry.getValue());
+
+ final Map<String, String> attributeA2 = new HashMap<>();
+ attributeA2.put("p2", "a2"); // Update p2
+ attributeA2.put("p3", "a2"); // Add p3
+
+ // Notify again
+ protocol.notify(signalId, "a", 1, attributeA2);
+
+ cacheEntry = cacheEntries.get("signal-id");
+ assertEquals(1, cacheEntry.getRevision());
+ assertEquals("Updated attributes should be merged correctly",
+ "{\"counts\":{\"a\":2},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a2\",\"p3\":\"a2\"}}", cacheEntry.getValue());
+
+ }
+
+ @Test
+ public void testSignalCount() throws Exception {
+ doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong());
+
+ final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
+
+ final String signalId = "signal-id";
+
+ Signal signal = protocol.getSignal(signalId);
+ assertNull("Should be null since there's no signal yet", signal);
+
+ // First notification.
+ protocol.notify(signalId, "success", 1, null);
+
+ signal = protocol.getSignal(signalId);
+ assertNotNull(signal);
+ assertEquals(1, signal.getCount("success"));
+ assertTrue(signal.isCountReached("success", 1));
+ assertFalse(signal.isCountReached("success", 2));
+ assertTrue(signal.isTotalCountReached(1));
+ assertFalse(signal.isTotalCountReached(2));
+
+ // Notify again with different counter name.
+ protocol.notify(signalId, "failure", 1, null);
+
+ signal = protocol.getSignal(signalId);
+ assertNotNull(signal);
+ assertEquals(1, signal.getCount("success"));
+ assertEquals(1, signal.getCount("failure"));
+ assertTrue(signal.isCountReached("failure", 1));
+ assertFalse(signal.isCountReached("failure", 2));
+ assertTrue(signal.isTotalCountReached(1));
+ assertTrue(signal.isTotalCountReached(2));
+
+ }
+
+ /**
+ * Test migration across NiFi version upgrade.
+ * Old version of Wait/Notify processors use FlowFileAttributesSerializer for cache entries.
+ * New version uses StringSerializer. WaitNotifyProtocol should be able to migrate old cache entries.
+ */
+ @Test
+ public void testNiFiVersionUpgrade() throws Exception {
+ doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong());
+
+ // Simulate old cache entry.
+ final FlowFileAttributesSerializer attributesSerializer = new FlowFileAttributesSerializer();
+ final Map<String, String> cachedAttributes = new HashMap<>();
+ cachedAttributes.put("key1", "value1");
+ cachedAttributes.put("key2", "value2");
+ cachedAttributes.put("key3", "value3");
+ final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ attributesSerializer.serialize(cachedAttributes, bos);
+
+ final String signalId = "old-entry";
+ cacheEntries.put(signalId, new StandardCacheEntry<>(signalId, new String(bos.toByteArray(), StandardCharsets.UTF_8), 0));
+
+ final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
+ final Signal signal = protocol.getSignal(signalId);
+
+ assertEquals(1, signal.getCount(WaitNotifyProtocol.DEFAULT_COUNT_NAME));
+ assertEquals("value1", signal.getAttributes().get("key1"));
+ assertEquals("value2", signal.getAttributes().get("key2"));
+ assertEquals("value3", signal.getAttributes().get("key3"));
+
+ cacheEntries.put(signalId, new StandardCacheEntry<>(signalId, "UNSUPPORTED_FORMAT", 0));
+ try {
+ protocol.getSignal(signalId);
+ fail("Should fail since cached value was not in expected format.");
+ } catch (DeserializationException e) {
+ }
+
+ }
+
+}
\ No newline at end of file
[2/2] nifi git commit: NIFI-3216: Add N signals to Wait/Notify
Posted by bb...@apache.org.
NIFI-3216: Add N signals to Wait/Notify
- Support counters at Wait/Notify processors so that NiFi flow can be
configured to wait for N signals
- Extract Wait/Notify logics into WaitNotifyProtocol
- Added FragmentAttributes to manage commonly used fragment attributes
- Changed existing split processors to set 'fragment.identifier' and
'fragment.count', so that Wait can use those to wait for all splits
get processed
This closes #1420.
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7f0171ff
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7f0171ff
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7f0171ff
Branch: refs/heads/master
Commit: 7f0171ffa25bfd8932e49b3367049b101799dea4
Parents: e62eeb7
Author: Koji Kawamura <ij...@apache.org>
Authored: Fri Jan 13 16:52:30 2017 +0900
Committer: Bryan Bende <bb...@apache.org>
Committed: Thu Jan 19 15:17:59 2017 -0500
----------------------------------------------------------------------
.../flowfile/attributes/FragmentAttributes.java | 73 +++++
.../apache/nifi/processors/avro/SplitAvro.java | 17 +-
.../nifi/processors/avro/TestSplitAvro.java | 13 +-
.../processors/standard/ConvertJSONToSQL.java | 12 +-
.../nifi/processors/standard/MergeContent.java | 9 +-
.../apache/nifi/processors/standard/Notify.java | 52 ++--
.../apache/nifi/processors/standard/PutSQL.java | 7 +-
.../processors/standard/SegmentContent.java | 10 +-
.../nifi/processors/standard/SplitContent.java | 18 +-
.../nifi/processors/standard/SplitJson.java | 18 +-
.../nifi/processors/standard/SplitText.java | 21 +-
.../nifi/processors/standard/SplitXml.java | 19 +-
.../nifi/processors/standard/UnpackContent.java | 11 +-
.../apache/nifi/processors/standard/Wait.java | 163 +++++++---
.../processors/standard/WaitNotifyProtocol.java | 193 ++++++++++++
.../standard/TestConvertJSONToSQL.java | 21 ++
.../nifi/processors/standard/TestNotify.java | 128 ++++++--
.../processors/standard/TestSegmentContent.java | 10 +
.../processors/standard/TestSplitContent.java | 21 ++
.../nifi/processors/standard/TestSplitJson.java | 24 +-
.../nifi/processors/standard/TestSplitText.java | 25 +-
.../nifi/processors/standard/TestSplitXml.java | 17 +-
.../processors/standard/TestUnpackContent.java | 26 ++
.../nifi/processors/standard/TestWait.java | 311 ++++++++++++++-----
.../standard/TestWaitNotifyProtocol.java | 245 +++++++++++++++
25 files changed, 1214 insertions(+), 250 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FragmentAttributes.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FragmentAttributes.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FragmentAttributes.java
new file mode 100644
index 0000000..6f055da
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FragmentAttributes.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowfile.attributes;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This enum class contains flow file attribute keys commonly used among Split processors.
+ */
+public enum FragmentAttributes implements FlowFileAttributeKey {
+
+ /**
+ * The number of bytes from the original FlowFile that were copied to this FlowFile,
+ * including header, if applicable, which is duplicated in each split FlowFile.
+ */
+ FRAGMENT_SIZE("fragment.size"),
+ /**
+ * All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute.
+ */
+ FRAGMENT_ID("fragment.identifier"),
+ /**
+ * A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile.
+ */
+ FRAGMENT_INDEX("fragment.index"),
+ /**
+ * The number of split FlowFiles generated from the parent FlowFile.
+ */
+ FRAGMENT_COUNT("fragment.count"),
+ /**
+ * The filename of the parent FlowFile.
+ */
+ SEGMENT_ORIGINAL_FILENAME("segment.original.filename");
+
+ private final String key;
+
+ FragmentAttributes(final String key) {
+ this.key = key;
+ }
+
+ @Override
+ public String key() {
+ return key;
+ }
+
+ public static FlowFile copyAttributesToOriginal(final ProcessSession processSession, final FlowFile originalFlowFile,
+ final String fragmentId, final int fragmentCount) {
+ final Map<String, String> attributesToOriginal = new HashMap<>();
+ if (fragmentId != null && fragmentId.length() > 0) {
+ attributesToOriginal.put(FRAGMENT_ID.key(), fragmentId);
+ }
+ attributesToOriginal.put(FRAGMENT_COUNT.key(), String.valueOf(fragmentCount));
+ return processSession.putAllAttributes(originalFlowFile, attributesToOriginal);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
index 83964fa..3a28917 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
@@ -64,6 +64,12 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedOutputStream;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal;
+
@SideEffectFree
@SupportsBatching
@Tags({ "avro", "split" })
@@ -217,13 +223,14 @@ public class SplitAvro extends AbstractProcessor {
final String fragmentIdentifier = UUID.randomUUID().toString();
IntStream.range(0, splits.size()).forEach((i) -> {
FlowFile split = splits.get(i);
- split = session.putAttribute(split, "fragment.identifier", fragmentIdentifier);
- split = session.putAttribute(split, "fragment.index", Integer.toString(i));
- split = session.putAttribute(split, "segment.original.filename", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
- split = session.putAttribute(split, "fragment.count", Integer.toString(splits.size()));
+ split = session.putAttribute(split, FRAGMENT_ID.key(), fragmentIdentifier);
+ split = session.putAttribute(split, FRAGMENT_INDEX.key(), Integer.toString(i));
+ split = session.putAttribute(split, SEGMENT_ORIGINAL_FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+ split = session.putAttribute(split, FRAGMENT_COUNT.key(), Integer.toString(splits.size()));
session.transfer(split, REL_SPLIT);
});
- session.transfer(flowFile, REL_ORIGINAL);
+ final FlowFile originalFlowFile = copyAttributesToOriginal(session, flowFile, fragmentIdentifier, splits.size());
+ session.transfer(originalFlowFile, REL_ORIGINAL);
} catch (ProcessException e) {
getLogger().error("Failed to split {} due to {}", new Object[]{flowFile, e.getMessage()}, e);
session.transfer(flowFile, REL_FAILURE);
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java
index 32d43e3..c17842d 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java
@@ -44,6 +44,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.stream.IntStream;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
import static org.junit.Assert.assertEquals;
public class TestSplitAvro {
@@ -116,6 +118,9 @@ public class TestSplitAvro {
runner.assertTransferCount(SplitAvro.REL_SPLIT, 100);
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
+ final MockFlowFile originalFlowFile = runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0);
+ originalFlowFile.assertAttributeExists(FRAGMENT_ID.key());
+ originalFlowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "100");
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkDataFileSplitSize(flowFiles, 1, true);
final String fragmentIdentifier = flowFiles.get(0).getAttribute("fragment.identifier");
@@ -123,7 +128,7 @@ public class TestSplitAvro {
MockFlowFile flowFile = flowFiles.get(i);
assertEquals(i, Integer.parseInt(flowFile.getAttribute("fragment.index")));
assertEquals(fragmentIdentifier, flowFile.getAttribute("fragment.identifier"));
- assertEquals(flowFiles.size(), Integer.parseInt(flowFile.getAttribute("fragment.count")));
+ assertEquals(flowFiles.size(), Integer.parseInt(flowFile.getAttribute(FRAGMENT_COUNT.key())));
assertEquals(filename, flowFile.getAttribute("segment.original.filename"));
});
}
@@ -140,6 +145,7 @@ public class TestSplitAvro {
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
+ runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5");
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkDataFileSplitSize(flowFiles, 20, true);
}
@@ -156,6 +162,7 @@ public class TestSplitAvro {
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
+ runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkDataFileSplitSize(flowFiles, 100, true);
}
@@ -172,6 +179,7 @@ public class TestSplitAvro {
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
+ runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "100");
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkDataFileSplitSize(flowFiles, 1, false);
@@ -197,6 +205,7 @@ public class TestSplitAvro {
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
+ runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "100");
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkBareRecordsSplitSize(flowFiles, 1, true);
@@ -215,6 +224,7 @@ public class TestSplitAvro {
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
+ runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5");
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkBareRecordsSplitSize(flowFiles, 20, true);
@@ -234,6 +244,7 @@ public class TestSplitAvro {
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
+ runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5");
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkBareRecordsSplitSize(flowFiles, 20, false);
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
index 2db7bcc..1ce6fb5 100755
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@ -65,6 +65,11 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode;
import org.codehaus.jackson.node.JsonNodeFactory;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal;
+
@SideEffectFree
@SupportsBatching
@SeeAlso(PutSQL.class)
@@ -369,9 +374,9 @@ public class ConvertJSONToSQL extends AbstractProcessor {
attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
attributes.put("sql.table", tableName);
- attributes.put("fragment.identifier", fragmentIdentifier);
- attributes.put("fragment.count", String.valueOf(arrayNode.size()));
- attributes.put("fragment.index", String.valueOf(i));
+ attributes.put(FRAGMENT_ID.key(), fragmentIdentifier);
+ attributes.put(FRAGMENT_COUNT.key(), String.valueOf(arrayNode.size()));
+ attributes.put(FRAGMENT_INDEX.key(), String.valueOf(i));
if (catalog != null) {
attributes.put("sql.catalog", catalog);
@@ -381,6 +386,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
session.transfer(sqlFlowFile, REL_SQL);
}
+ flowFile = copyAttributesToOriginal(session, flowFile, fragmentIdentifier, arrayNode.size());
session.transfer(flowFile, REL_ORIGINAL);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 16bb911..f18416e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -67,6 +67,7 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
@@ -127,15 +128,15 @@ import org.apache.nifi.util.FlowFilePackagerV3;
public class MergeContent extends BinFiles {
// preferred attributes
- public static final String FRAGMENT_ID_ATTRIBUTE = "fragment.identifier";
- public static final String FRAGMENT_INDEX_ATTRIBUTE = "fragment.index";
- public static final String FRAGMENT_COUNT_ATTRIBUTE = "fragment.count";
+ public static final String FRAGMENT_ID_ATTRIBUTE = FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX_ATTRIBUTE = FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT_ATTRIBUTE = FragmentAttributes.FRAGMENT_COUNT.key();
// old style attributes
public static final String SEGMENT_ID_ATTRIBUTE = "segment.identifier";
public static final String SEGMENT_INDEX_ATTRIBUTE = "segment.index";
public static final String SEGMENT_COUNT_ATTRIBUTE = "segment.count";
- public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
+ public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
public static final AllowableValue MERGE_STRATEGY_BIN_PACK = new AllowableValue(
"Bin-Packing Algorithm",
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
index 32cd249..9d809ef 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
@@ -17,8 +17,6 @@
package org.apache.nifi.processors.standard;
import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -37,9 +35,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
-import org.apache.nifi.distributed.cache.client.Serializer;
-import org.apache.nifi.distributed.cache.client.exception.SerializationException;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@@ -49,7 +45,6 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
@EventDriven
@SupportsBatching
@@ -67,7 +62,7 @@ public class Notify extends AbstractProcessor {
.name("Distributed Cache Service")
.description("The Controller Service that is used to cache release signals in order to release files queued at a corresponding Wait processor")
.required(true)
- .identifiesControllerService(DistributedMapCacheClient.class)
+ .identifiesControllerService(AtomicDistributedMapCacheClient.class)
.build();
// Selects the FlowFile attribute or expression, whose value is used as cache key
@@ -80,6 +75,18 @@ public class Notify extends AbstractProcessor {
.expressionLanguageSupported(true)
.build();
+ public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder()
+ .name("Signal Counter Name")
+ .description("A value, or the results of an Attribute Expression Language statement, which will " +
+ "be evaluated against a FlowFile in order to determine the signal counter name. " +
+ "Signal counter name is useful when a corresponding Wait processor needs to know the number of occurrences " +
+ "of different types of events, such as success or failure, or destination data source names, etc.")
+ .required(true)
+ .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
+ .expressionLanguageSupported(true)
+ .defaultValue(WaitNotifyProtocol.DEFAULT_COUNT_NAME)
+ .build();
+
// Specifies an optional regex used to identify which attributes to cache
public static final PropertyDescriptor ATTRIBUTE_CACHE_REGEX = new PropertyDescriptor.Builder()
.name("Attribute Cache Regex")
@@ -103,9 +110,6 @@ public class Notify extends AbstractProcessor {
private final Set<Relationship> relationships;
- private final Serializer<String> keySerializer = new StringSerializer();
- private final Serializer<Map<String, String>> valueSerializer = new FlowFileAttributesSerializer();
-
public Notify() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
@@ -117,6 +121,7 @@ public class Notify extends AbstractProcessor {
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(RELEASE_SIGNAL_IDENTIFIER);
+ descriptors.add(SIGNAL_COUNTER_NAME);
descriptors.add(DISTRIBUTED_CACHE_SERVICE);
descriptors.add(ATTRIBUTE_CACHE_REGEX);
return descriptors;
@@ -137,11 +142,12 @@ public class Notify extends AbstractProcessor {
final ComponentLog logger = getLogger();
- // cache key is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
- final String cacheKey = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
+ // Signal id is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
+ final String signalId = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
+ final String counterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue();
// if the computed value is null, or empty, we transfer the flow file to failure relationship
- if (StringUtils.isBlank(cacheKey)) {
+ if (StringUtils.isBlank(signalId)) {
logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
@@ -149,7 +155,8 @@ public class Notify extends AbstractProcessor {
}
// the cache client used to interact with the distributed cache
- final DistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
+ final AtomicDistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class);
+ final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
try {
final String attributeCacheRegex = (context.getProperty(ATTRIBUTE_CACHE_REGEX).isSet())
@@ -164,10 +171,12 @@ public class Notify extends AbstractProcessor {
}
if (logger.isDebugEnabled()) {
- logger.debug("Cached release signal identifier {} from FlowFile {}", new Object[] {cacheKey, flowFile});
+ logger.debug("Cached release signal identifier {} counterName {} from FlowFile {}", new Object[] {signalId, counterName, flowFile});
}
- cache.put(cacheKey, attributesToCache, keySerializer, valueSerializer);
+ // In case of ConcurrentModificationException, just throw the exception so that processor can
+ // retry after yielding for a while.
+ protocol.notify(signalId, counterName, 1, attributesToCache);
session.transfer(flowFile, REL_SUCCESS);
} catch (final IOException e) {
@@ -177,15 +186,4 @@ public class Notify extends AbstractProcessor {
}
}
- /**
- * Simple string serializer, used for serializing the cache key
- */
- public static class StringSerializer implements Serializer<String> {
-
- @Override
- public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
- out.write(value.getBytes(StandardCharsets.UTF_8));
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index adfac05..76901fe 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -29,6 +29,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessContext;
@@ -161,9 +162,9 @@ public class PutSQL extends AbstractProcessor {
private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type");
private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
- private static final String FRAGMENT_ID_ATTR = "fragment.identifier";
- private static final String FRAGMENT_INDEX_ATTR = "fragment.index";
- private static final String FRAGMENT_COUNT_ATTR = "fragment.count";
+ private static final String FRAGMENT_ID_ATTR = FragmentAttributes.FRAGMENT_ID.key();
+ private static final String FRAGMENT_INDEX_ATTR = FragmentAttributes.FRAGMENT_INDEX.key();
+ private static final String FRAGMENT_COUNT_ATTR = FragmentAttributes.FRAGMENT_COUNT.key();
private static final Pattern LONG_PATTERN = Pattern.compile("^\\d{1,19}$");
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
index 4e25d3c..1fc1feb 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
@@ -38,6 +38,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
@@ -80,11 +81,11 @@ public class SegmentContent extends AbstractProcessor {
public static final String SEGMENT_ID = "segment.identifier";
public static final String SEGMENT_INDEX = "segment.index";
public static final String SEGMENT_COUNT = "segment.count";
- public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
+ public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
- public static final String FRAGMENT_ID = "fragment.identifier";
- public static final String FRAGMENT_INDEX = "fragment.index";
- public static final String FRAGMENT_COUNT = "fragment.count";
+ public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
public static final PropertyDescriptor SIZE = new PropertyDescriptor.Builder()
.name("Segment Size")
@@ -180,6 +181,7 @@ public class SegmentContent extends AbstractProcessor {
}
session.transfer(segmentSet, REL_SEGMENTS);
+ flowFile = FragmentAttributes.copyAttributesToOriginal(session, flowFile, segmentId, totalSegments);
session.transfer(flowFile, REL_ORIGINAL);
if (totalSegments <= 10) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
index 52b124e..d20fe8c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
@@ -50,6 +50,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -77,10 +78,10 @@ import org.apache.nifi.util.Tuple;
public class SplitContent extends AbstractProcessor {
// attribute keys
- public static final String FRAGMENT_ID = "fragment.identifier";
- public static final String FRAGMENT_INDEX = "fragment.index";
- public static final String FRAGMENT_COUNT = "fragment.count";
- public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
+ public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
+ public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
static final AllowableValue HEX_FORMAT = new AllowableValue("Hexadecimal", "Hexadecimal", "The Byte Sequence will be interpreted as a hexadecimal representation of bytes");
static final AllowableValue UTF8_FORMAT = new AllowableValue("Text", "Text", "The Byte Sequence will be interpreted as UTF-8 Encoded text");
@@ -182,7 +183,7 @@ public class SplitContent extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
- final FlowFile flowFile = session.get();
+ FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
@@ -285,8 +286,9 @@ public class SplitContent extends AbstractProcessor {
splitList.add(finalSplit);
}
- finishFragmentAttributes(session, flowFile, splitList);
+ final String fragmentId = finishFragmentAttributes(session, flowFile, splitList);
session.transfer(splitList, REL_SPLITS);
+ flowFile = FragmentAttributes.copyAttributesToOriginal(session, flowFile, fragmentId, splitList.size());
session.transfer(flowFile, REL_ORIGINAL);
if (splitList.size() > 10) {
@@ -302,8 +304,9 @@ public class SplitContent extends AbstractProcessor {
* @param session session
* @param source source
* @param splits splits
+ * @return generated fragment identifier for the splits
*/
- private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) {
+ private String finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) {
final String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key());
final String fragmentId = UUID.randomUUID().toString();
@@ -319,6 +322,7 @@ public class SplitContent extends AbstractProcessor {
FlowFile newFF = session.putAllAttributes(ff, attributes);
splits.add(newFF);
}
+ return fragmentId;
}
static class HexStringPropertyValidator implements Validator {
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
index 003834e..e7df459 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
@@ -56,6 +56,12 @@ import com.jayway.jsonpath.InvalidJsonException;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal;
+
@EventDriven
@SideEffectFree
@SupportsBatching
@@ -166,7 +172,7 @@ public class SplitJson extends AbstractJsonPathProcessor {
@Override
public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) {
- final FlowFile original = processSession.get();
+ FlowFile original = processSession.get();
if (original == null) {
return;
}
@@ -202,8 +208,9 @@ public class SplitJson extends AbstractJsonPathProcessor {
List resultList = (List) jsonPathResult;
Map<String, String> attributes = new HashMap<>();
- attributes.put("fragment.identifier", UUID.randomUUID().toString());
- attributes.put("fragment.count", Integer.toString(resultList.size()));
+ final String fragmentId = UUID.randomUUID().toString();
+ attributes.put(FRAGMENT_ID.key(), fragmentId);
+ attributes.put(FRAGMENT_COUNT.key(), Integer.toString(resultList.size()));
for (int i = 0; i < resultList.size(); i++) {
Object resultSegment = resultList.get(i);
@@ -213,11 +220,12 @@ public class SplitJson extends AbstractJsonPathProcessor {
out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8));
}
);
- attributes.put("segment.original.filename", split.getAttribute(CoreAttributes.FILENAME.key()));
- attributes.put("fragment.index", Integer.toString(i));
+ attributes.put(SEGMENT_ORIGINAL_FILENAME.key(), split.getAttribute(CoreAttributes.FILENAME.key()));
+ attributes.put(FRAGMENT_INDEX.key(), Integer.toString(i));
processSession.transfer(processSession.putAllAttributes(split, attributes), REL_SPLIT);
}
+ original = copyAttributesToOriginal(processSession, original, fragmentId, resultList.size());
processSession.transfer(original, REL_ORIGINAL);
logger.info("Split {} into {} FlowFiles", new Object[]{original, resultList.size()});
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
index ddb770d..e57841f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
@@ -48,6 +48,7 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
@@ -86,11 +87,11 @@ import org.apache.nifi.stream.io.util.TextLineDemarcator.OffsetInfo;
public class SplitText extends AbstractProcessor {
// attribute keys
public static final String SPLIT_LINE_COUNT = "text.line.count";
- public static final String FRAGMENT_SIZE = "fragment.size";
- public static final String FRAGMENT_ID = "fragment.identifier";
- public static final String FRAGMENT_INDEX = "fragment.index";
- public static final String FRAGMENT_COUNT = "fragment.count";
- public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
+ public static final String FRAGMENT_SIZE = FragmentAttributes.FRAGMENT_SIZE.key();
+ public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
+ public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
public static final PropertyDescriptor LINE_SPLIT_COUNT = new PropertyDescriptor.Builder()
.name("Line Split Count")
@@ -250,8 +251,10 @@ public class SplitText extends AbstractProcessor {
if (error.get()){
processSession.transfer(sourceFlowFile, REL_FAILURE);
} else {
- List<FlowFile> splitFlowFiles = this.generateSplitFlowFiles(sourceFlowFile, headerSplitInfoRef.get(), computedSplitsInfo, processSession);
- processSession.transfer(sourceFlowFile, REL_ORIGINAL);
+ final String fragmentId = UUID.randomUUID().toString();
+ List<FlowFile> splitFlowFiles = this.generateSplitFlowFiles(fragmentId, sourceFlowFile, headerSplitInfoRef.get(), computedSplitsInfo, processSession);
+ final FlowFile originalFlowFile = FragmentAttributes.copyAttributesToOriginal(processSession, sourceFlowFile, fragmentId, splitFlowFiles.size());
+ processSession.transfer(originalFlowFile, REL_ORIGINAL);
if (!splitFlowFiles.isEmpty()) {
processSession.transfer(splitFlowFiles, REL_SPLITS);
}
@@ -279,7 +282,8 @@ public class SplitText extends AbstractProcessor {
* it signifies the header information and its contents will be included in
* each and every computed split.
*/
- private List<FlowFile> generateSplitFlowFiles(FlowFile sourceFlowFile, SplitInfo splitInfo, List<SplitInfo> computedSplitsInfo, ProcessSession processSession){
+ private List<FlowFile> generateSplitFlowFiles(final String fragmentId, FlowFile sourceFlowFile, SplitInfo splitInfo,
+ List<SplitInfo> computedSplitsInfo, ProcessSession processSession){
List<FlowFile> splitFlowFiles = new ArrayList<>();
FlowFile headerFlowFile = null;
long headerCrlfLength = 0;
@@ -288,7 +292,6 @@ public class SplitText extends AbstractProcessor {
headerCrlfLength = splitInfo.trimmedLength;
}
int fragmentIndex = 1; // set to 1 to preserve the existing behavior *only*. Perhaps should be deprecated to follow the 0,1,2... scheme
- String fragmentId = UUID.randomUUID().toString();
if ((computedSplitsInfo.size() == 0) && (headerFlowFile != null)) {
FlowFile splitFlowFile = processSession.clone(sourceFlowFile, 0, headerFlowFile.getSize() - headerCrlfLength);
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java
index 0f0032a..502f7f3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java
@@ -64,6 +64,12 @@ import org.xml.sax.Locator;
import org.xml.sax.SAXException;
import org.xml.sax.XMLReader;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal;
+
@EventDriven
@SideEffectFree
@SupportsBatching
@@ -161,9 +167,9 @@ public class SplitXml extends AbstractProcessor {
final XmlSplitterSaxParser parser = new XmlSplitterSaxParser(xmlTree -> {
FlowFile split = session.create(original);
split = session.write(split, out -> out.write(xmlTree.getBytes("UTF-8")));
- split = session.putAttribute(split, "fragment.identifier", fragmentIdentifier);
- split = session.putAttribute(split, "fragment.index", Integer.toString(numberOfRecords.getAndIncrement()));
- split = session.putAttribute(split, "segment.original.filename", split.getAttribute(CoreAttributes.FILENAME.key()));
+ split = session.putAttribute(split, FRAGMENT_ID.key(), fragmentIdentifier);
+ split = session.putAttribute(split, FRAGMENT_INDEX.key(), Integer.toString(numberOfRecords.getAndIncrement()));
+ split = session.putAttribute(split, SEGMENT_ORIGINAL_FILENAME.key(), split.getAttribute(CoreAttributes.FILENAME.key()));
splits.add(split);
}, depth);
@@ -188,12 +194,13 @@ public class SplitXml extends AbstractProcessor {
session.remove(splits);
} else {
splits.forEach((split) -> {
- split = session.putAttribute(split, "fragment.count", Integer.toString(numberOfRecords.get()));
+ split = session.putAttribute(split, FRAGMENT_COUNT.key(), Integer.toString(numberOfRecords.get()));
session.transfer(split, REL_SPLIT);
});
- session.transfer(original, REL_ORIGINAL);
- logger.info("Split {} into {} FlowFiles", new Object[]{original, splits.size()});
+ final FlowFile originalToTransfer = copyAttributesToOriginal(session, original, fragmentIdentifier, numberOfRecords.get());
+ session.transfer(originalToTransfer, REL_ORIGINAL);
+ logger.info("Split {} into {} FlowFiles", new Object[]{originalToTransfer, splits.size()});
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
index 0437ed1..18015fa 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
@@ -53,6 +53,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -97,10 +98,10 @@ import org.apache.nifi.util.FlowFileUnpackagerV3;
@SeeAlso(MergeContent.class)
public class UnpackContent extends AbstractProcessor {
// attribute keys
- public static final String FRAGMENT_ID = "fragment.identifier";
- public static final String FRAGMENT_INDEX = "fragment.index";
- public static final String FRAGMENT_COUNT = "fragment.count";
- public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
+ public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
+ public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
public static final String AUTO_DETECT_FORMAT_NAME = "use mime.type attribute";
public static final String TAR_FORMAT_NAME = "tar";
@@ -262,6 +263,8 @@ public class UnpackContent extends AbstractProcessor {
finishFragmentAttributes(session, flowFile, unpacked);
}
session.transfer(unpacked, REL_SUCCESS);
+ final String fragmentId = unpacked.size() > 0 ? unpacked.get(0).getAttribute(FRAGMENT_ID) : null;
+ flowFile = FragmentAttributes.copyAttributesToOriginal(session, flowFile, fragmentId, unpacked.size());
session.transfer(flowFile, REL_ORIGINAL);
session.getProvenanceReporter().fork(flowFile, unpacked);
logger.info("Unpacked {} into {} and transferred to success", new Object[]{flowFile, unpacked});
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
index 4a4f803..0bd5ca6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
@@ -17,8 +17,6 @@
package org.apache.nifi.processors.standard;
import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -28,6 +26,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
@@ -35,15 +34,13 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.distributed.cache.client.Deserializer;
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
-import org.apache.nifi.distributed.cache.client.Serializer;
-import org.apache.nifi.distributed.cache.client.exception.SerializationException;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@@ -53,19 +50,30 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
+import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal;
@EventDriven
@SupportsBatching
@Tags({"map", "cache", "wait", "hold", "distributed", "signal", "release"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Routes incoming FlowFiles to the 'wait' relationship until a matching release signal "
- + "is stored in the distributed cache from a corresponding Notify processor. At this point, a waiting FlowFile is routed to "
- + "the 'success' relationship, with attributes copied from the FlowFile that produced "
- + "the release signal from the Notify processor. The release signal entry is then removed from "
- + "the cache. Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration.")
-@WritesAttribute(attribute = "wait.start.timestamp", description = "All FlowFiles will have an attribute 'wait.start.timestamp', which sets the "
- + "initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile.")
+ + "is stored in the distributed cache from a corresponding Notify processor. "
+ + "When a matching release signal is identified, a waiting FlowFile is routed to the 'success' relationship, "
+ + "with attributes copied from the FlowFile that produced the release signal from the Notify processor. "
+ + "The release signal entry is then removed from the cache. Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration. "
+
+ + "If you need to wait for more than one signal, specify the desired number of signals via the 'Target Signal Count' property. "
+ + "This is particularly useful with processors that split a source flow file into multiple fragments, such as SplitText. "
+ + "In order to wait for all fragments to be processed, connect the 'original' relationship to a Wait processor, and the 'splits' relationship to "
+ + "a corresponding Notify processor. Configure the Notify and Wait processors to use the '${fragment.identifier}' as the value "
+ + "of 'Release Signal Identifier', and specify '${fragment.count}' as the value of 'Target Signal Count' in the Wait processor."
+)
+@WritesAttributes({
+ @WritesAttribute(attribute = "wait.start.timestamp", description = "All FlowFiles will have an attribute 'wait.start.timestamp', which sets the "
+ + "initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile."),
+ @WritesAttribute(attribute = "wait.counter.<counterName>", description = "If a signal exists when the processor runs, "
+ + "each count value in the signal is copied.")
+})
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
"org.apache.nifi.processors.standard.Notify"})
public class Wait extends AbstractProcessor {
@@ -77,7 +85,7 @@ public class Wait extends AbstractProcessor {
.name("Distributed Cache Service")
.description("The Controller Service that is used to check for release signals from a corresponding Notify processor")
.required(true)
- .identifiesControllerService(DistributedMapCacheClient.class)
+ .identifiesControllerService(AtomicDistributedMapCacheClient.class)
.build();
// Selects the FlowFile attribute or expression, whose value is used as cache key
@@ -90,6 +98,29 @@ public class Wait extends AbstractProcessor {
.expressionLanguageSupported(true)
.build();
+ public static final PropertyDescriptor TARGET_SIGNAL_COUNT = new PropertyDescriptor.Builder()
+ .name("Target Signal Count")
+ .description("A value, or the results of an Attribute Expression Language statement, which will " +
+ "be evaluated against a FlowFile in order to determine the target signal count. " +
+ "This processor checks whether the signal count has reached this number. " +
+ "If Signal Counter Name is specified, this processor checks a particular counter, " +
+ "otherwise checks against total count in a signal.")
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .defaultValue("1")
+ .build();
+
+ public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder()
+ .name("Signal Counter Name")
+ .description("A value, or the results of an Attribute Expression Language statement, which will " +
+ "be evaluated against a FlowFile in order to determine the signal counter name. " +
+ "If not specified, this processor checks the total count in a signal.")
+ .required(false)
+ .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
+ .expressionLanguageSupported(true)
+ .build();
+
// Selects the FlowFile attribute or expression, whose value is used as cache key
public static final PropertyDescriptor EXPIRATION_DURATION = new PropertyDescriptor.Builder()
.name("Expiration Duration")
@@ -136,9 +167,6 @@ public class Wait extends AbstractProcessor {
.build();
private final Set<Relationship> relationships;
- private final Serializer<String> keySerializer = new StringSerializer();
- private final Deserializer<Map<String, String>> valueDeserializer = new FlowFileAttributesSerializer();
-
public Wait() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
@@ -152,6 +180,8 @@ public class Wait extends AbstractProcessor {
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(RELEASE_SIGNAL_IDENTIFIER);
+ descriptors.add(TARGET_SIGNAL_COUNT);
+ descriptors.add(SIGNAL_COUNTER_NAME);
descriptors.add(EXPIRATION_DURATION);
descriptors.add(DISTRIBUTED_CACHE_SERVICE);
descriptors.add(ATTRIBUTE_COPY_MODE);
@@ -173,11 +203,11 @@ public class Wait extends AbstractProcessor {
final ComponentLog logger = getLogger();
- // cache key is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
- final String cacheKey = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
+ // Signal id is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
+ final String signalId = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
// if the computed value is null, or empty, we transfer the flow file to failure relationship
- if (StringUtils.isBlank(cacheKey)) {
+ if (StringUtils.isBlank(signalId)) {
logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
@@ -185,9 +215,17 @@ public class Wait extends AbstractProcessor {
}
// the cache client used to interact with the distributed cache
- final DistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
+ final AtomicDistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class);
+ final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
+
+ String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue();
+ final boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode);
+ Signal signal = null;
try {
+ // get notifying signal
+ signal = protocol.getSignal(signalId);
+
// check for expiration
String waitStartTimestamp = flowFile.getAttribute(WAIT_START_TIMESTAMP);
if (waitStartTimestamp == null) {
@@ -201,6 +239,8 @@ public class Wait extends AbstractProcessor {
} catch (NumberFormatException nfe) {
logger.error("{} has an invalid value '{}' on FlowFile {}", new Object[] {WAIT_START_TIMESTAMP, waitStartTimestamp, flowFile});
flowFile = session.penalize(flowFile);
+
+ flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
session.transfer(flowFile, REL_FAILURE);
return;
}
@@ -209,58 +249,83 @@ public class Wait extends AbstractProcessor {
long now = System.currentTimeMillis();
if (now > (lWaitStartTimestamp + expirationDuration)) {
logger.warn("FlowFile {} expired after {}ms", new Object[] {flowFile, (now - lWaitStartTimestamp)});
+ flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
session.transfer(flowFile, REL_EXPIRED);
return;
}
- // get notifying flow file attributes
- Map<String, String> cachedAttributes = cache.get(cacheKey, keySerializer, valueDeserializer);
-
- if (cachedAttributes == null) {
+ if (signal == null) {
+ // If there's no signal yet, then we don't have to evaluate target counts. Return immediately.
if (logger.isDebugEnabled()) {
- logger.debug("No release signal yet for {} on FlowFile {}", new Object[] {cacheKey, flowFile});
+ logger.debug("No release signal yet for {} on FlowFile {}", new Object[] {signalId, flowFile});
}
session.transfer(flowFile, REL_WAIT);
return;
}
- // copy over attributes from release signal flow file, if provided
- if (!cachedAttributes.isEmpty()) {
- cachedAttributes.remove("uuid");
- String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue();
- if (ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode)) {
- flowFile = session.putAllAttributes(flowFile, cachedAttributes);
- } else {
- Map<String, String> attributesToCopy = new HashMap<>();
- for(Entry<String, String> entry : cachedAttributes.entrySet()) {
- // if the current flow file does *not* have the cached attribute, copy it
- if (flowFile.getAttribute(entry.getKey()) == null) {
- attributesToCopy.put(entry.getKey(), entry.getValue());
- }
- }
- flowFile = session.putAllAttributes(flowFile, attributesToCopy);
+ final String targetCounterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final Long targetCount = Long.valueOf(context.getProperty(TARGET_SIGNAL_COUNT).evaluateAttributeExpressions(flowFile).getValue());
+ final boolean reachedToTargetCount = StringUtils.isBlank(targetCounterName)
+ ? signal.isTotalCountReached(targetCount)
+ : signal.isCountReached(targetCounterName, targetCount);
+
+ if (!reachedToTargetCount) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Release signal count {} hasn't reached {} for {} on FlowFile {}",
+ new Object[] {targetCounterName, targetCount, signalId, flowFile});
}
+ flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
+ session.transfer(flowFile, REL_WAIT);
+ return;
}
+
+ flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
session.transfer(flowFile, REL_SUCCESS);
- cache.remove(cacheKey, keySerializer);
+ protocol.complete(signalId);
+
+ } catch (final NumberFormatException e) {
+ flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ logger.error("Failed to parse targetCount when processing {} due to {}", new Object[] {flowFile, e});
+
} catch (final IOException e) {
+ flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
logger.error("Unable to communicate with cache when processing {} due to {}", new Object[] {flowFile, e});
}
}
- /**
- * Simple string serializer, used for serializing the cache key
- */
- public static class StringSerializer implements Serializer<String> {
+ private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final boolean replaceOriginal) {
+ if (signal == null) {
+ return flowFile;
+ }
- @Override
- public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
- out.write(value.getBytes(StandardCharsets.UTF_8));
+ // copy over attributes from release signal flow file, if provided
+ final Map<String, String> attributesToCopy;
+ if (replaceOriginal) {
+ attributesToCopy = new HashMap<>(signal.getAttributes());
+ attributesToCopy.remove("uuid");
+ } else {
+ // if the current flow file does *not* have the cached attribute, copy it
+ attributesToCopy = signal.getAttributes().entrySet().stream()
+ .filter(e -> flowFile.getAttribute(e.getKey()) == null)
+ .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
}
+
+ // Copy counter attributes
+ final Map<String, Long> counts = signal.getCounts();
+ final long totalCount = counts.entrySet().stream().mapToLong(e -> {
+ final Long count = e.getValue();
+ attributesToCopy.put("wait.counter." + e.getKey(), String.valueOf(count));
+ return count;
+ }).sum();
+ attributesToCopy.put("wait.counter.total", String.valueOf(totalCount));
+
+ return session.putAllAttributes(flowFile, attributesToCopy);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
new file mode 100644
index 0000000..a74590a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient.CacheEntry;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class provide a protocol for Wait and Notify processors to work together.
+ * Once AtomicDistributedMapCacheClient is passed to this protocol, components that wish to join the notification mechanism
+ * should only use methods provided by this protocol, instead of calling cache API directly.
+ */
+public class WaitNotifyProtocol {
+
+ private static final Logger logger = LoggerFactory.getLogger(WaitNotifyProtocol.class);
+
+ public static final String DEFAULT_COUNT_NAME = "default";
+ private static final int MAX_REPLACE_RETRY_COUNT = 5;
+ private static final int REPLACE_RETRY_WAIT_MILLIS = 10;
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+ private static final Serializer<String> stringSerializer = (value, output) -> output.write(value.getBytes(StandardCharsets.UTF_8));
+ private final Deserializer<String> stringDeserializer = input -> new String(input, StandardCharsets.UTF_8);
+
+ public static class Signal {
+ private Map<String, Long> counts = new HashMap<>();
+ private Map<String, String> attributes = new HashMap<>();
+
+ public Map<String, Long> getCounts() {
+ return counts;
+ }
+
+ public void setCounts(Map<String, Long> counts) {
+ this.counts = counts;
+ }
+
+ public Map<String, String> getAttributes() {
+ return attributes;
+ }
+
+ public void setAttributes(Map<String, String> attributes) {
+ this.attributes = attributes;
+ }
+
+ public boolean isTotalCountReached(final long targetCount) {
+ final long totalCount = counts.values().stream().mapToLong(Long::longValue).sum();
+ return totalCount >= targetCount;
+ }
+
+ public boolean isCountReached(final String counterName, final long targetCount) {
+ return getCount(counterName) >= targetCount;
+ }
+
+ public long getCount(final String counterName) {
+ final Long count = counts.get(counterName);
+ return count != null ? count : -1;
+ }
+
+ }
+
+ private final AtomicDistributedMapCacheClient cache;
+
+ public WaitNotifyProtocol(final AtomicDistributedMapCacheClient cache) {
+ this.cache = cache;
+ }
+
+ /**
+ * Notify a signal to increase a counter.
+ * @param signalId a key in the underlying cache engine
+ * @param counterName specify count to update
+ * @param delta delta to update a counter
+ * @param attributes attributes to save in the cache entry
+ * @return A Signal instance, merged with an existing signal if any
+ * @throws IOException thrown when it failed interacting with the cache engine
+ * @throws ConcurrentModificationException thrown if other process is also updating the same signal and failed to update after few retry attempts
+ */
+ public Signal notify(final String signalId, final String counterName, final int delta, final Map<String, String> attributes)
+ throws IOException, ConcurrentModificationException {
+
+ for (int i = 0; i < MAX_REPLACE_RETRY_COUNT; i++) {
+
+ final CacheEntry<String, String> existingEntry = cache.fetch(signalId, stringSerializer, stringDeserializer);
+
+ Signal signal = getSignal(signalId);
+ if (signal == null) {
+ signal = new Signal();
+ }
+
+ if (attributes != null) {
+ signal.attributes.putAll(attributes);
+ }
+
+ long count = signal.counts.containsKey(counterName) ? signal.counts.get(counterName) : 0;
+ count += delta;
+ signal.counts.put(counterName, count);
+
+ final String signalJson = objectMapper.writeValueAsString(signal);
+ final long revision = existingEntry != null ? existingEntry.getRevision() : -1;
+
+
+ if (cache.replace(signalId, signalJson, stringSerializer, stringSerializer, revision)) {
+ return signal;
+ }
+
+ long waitMillis = REPLACE_RETRY_WAIT_MILLIS * (i + 1);
+ logger.info("Waiting for {} ms to retry... {}.{}", waitMillis, signalId, counterName);
+ try {
+ Thread.sleep(waitMillis);
+ } catch (InterruptedException e) {
+ final String msg = String.format("Interrupted while waiting for retrying signal [%s] counter [%s].", signalId, counterName);
+ throw new ConcurrentModificationException(msg, e);
+ }
+ }
+
+ final String msg = String.format("Failed to update signal [%s] counter [%s] after retrying %d times.", signalId, counterName, MAX_REPLACE_RETRY_COUNT);
+ throw new ConcurrentModificationException(msg);
+ }
+
+ /**
+ * Retrieve a stored Signal in the cache engine.
+ * If a caller gets satisfied with the returned Signal state and finish waiting, it should call {@link #complete(String)}
+ * to complete the Wait Notify protocol.
+ * @param signalId a key in the underlying cache engine
+ * @return A Signal instance
+ * @throws IOException thrown when it failed interacting with the cache engine
+ * @throws DeserializationException thrown if the cache found is not in expected serialized format
+ */
+ public Signal getSignal(final String signalId) throws IOException, DeserializationException {
+
+ final CacheEntry<String, String> entry = cache.fetch(signalId, stringSerializer, stringDeserializer);
+
+ if (entry == null) {
+ // No signal found.
+ return null;
+ }
+
+ final String value = entry.getValue();
+
+ try {
+ return objectMapper.readValue(value, Signal.class);
+ } catch (final JsonParseException jsonE) {
+ // Try to read it as FlowFileAttributes for backward compatibility.
+ try {
+ final Map<String, String> attributes = new FlowFileAttributesSerializer().deserialize(value.getBytes(StandardCharsets.UTF_8));
+ final Signal signal = new Signal();
+ signal.setAttributes(attributes);
+ signal.getCounts().put(DEFAULT_COUNT_NAME, 1L);
+ return signal;
+ } catch (Exception attrE) {
+ final String msg = String.format("Cached value for %s was not a serialized Signal nor FlowFileAttributes. Error messages: \"%s\", \"%s\"",
+ signalId, jsonE.getMessage(), attrE.getMessage());
+ throw new DeserializationException(msg);
+ }
+ }
+ }
+
+ /**
+ * Finish protocol and remove the cache entry.
+ * @param signalId a key in the underlying cache engine
+ * @throws IOException thrown when it failed interacting with the cache engine
+ */
+ public void complete(final String signalId) throws IOException {
+ cache.remove(signalId, stringSerializer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
index 7e95ada..f808072 100755
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
@@ -37,6 +37,9 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+
public class TestConvertJSONToSQL {
static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)";
@@ -70,6 +73,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -106,6 +110,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -140,6 +145,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -175,6 +181,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR));
@@ -210,6 +217,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 5);
final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL);
for (final MockFlowFile mff : mffs) {
@@ -245,6 +253,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 5);
final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL);
for (final MockFlowFile mff : mffs) {
@@ -279,6 +288,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR));
@@ -314,6 +324,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR));
@@ -349,6 +360,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
@@ -384,6 +396,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -420,6 +433,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -455,6 +469,9 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ final MockFlowFile originalFlowFile = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0);
+ originalFlowFile.assertAttributeExists(FRAGMENT_ID.key());
+ originalFlowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -590,6 +607,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -625,6 +643,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -687,6 +706,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -724,6 +744,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));