You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/01/19 20:18:28 UTC

[2/2] nifi git commit: NIFI-3216: Add N signals to Wait/Notify

NIFI-3216: Add N signals to Wait/Notify

- Support counters at Wait/Notify processors so that NiFi flow can be
  configured to wait for N signals
- Extract Wait/Notify logics into WaitNotifyProtocol
- Added FragmentAttributes to manage commonly used fragment attributes
- Changed existing split processors to set 'fragment.identifier' and
  'fragment.count', so that Wait can use those to wait for all splits
get processed

This closes #1420.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7f0171ff
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7f0171ff
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7f0171ff

Branch: refs/heads/master
Commit: 7f0171ffa25bfd8932e49b3367049b101799dea4
Parents: e62eeb7
Author: Koji Kawamura <ij...@apache.org>
Authored: Fri Jan 13 16:52:30 2017 +0900
Committer: Bryan Bende <bb...@apache.org>
Committed: Thu Jan 19 15:17:59 2017 -0500

----------------------------------------------------------------------
 .../flowfile/attributes/FragmentAttributes.java |  73 +++++
 .../apache/nifi/processors/avro/SplitAvro.java  |  17 +-
 .../nifi/processors/avro/TestSplitAvro.java     |  13 +-
 .../processors/standard/ConvertJSONToSQL.java   |  12 +-
 .../nifi/processors/standard/MergeContent.java  |   9 +-
 .../apache/nifi/processors/standard/Notify.java |  52 ++--
 .../apache/nifi/processors/standard/PutSQL.java |   7 +-
 .../processors/standard/SegmentContent.java     |  10 +-
 .../nifi/processors/standard/SplitContent.java  |  18 +-
 .../nifi/processors/standard/SplitJson.java     |  18 +-
 .../nifi/processors/standard/SplitText.java     |  21 +-
 .../nifi/processors/standard/SplitXml.java      |  19 +-
 .../nifi/processors/standard/UnpackContent.java |  11 +-
 .../apache/nifi/processors/standard/Wait.java   | 163 +++++++---
 .../processors/standard/WaitNotifyProtocol.java | 193 ++++++++++++
 .../standard/TestConvertJSONToSQL.java          |  21 ++
 .../nifi/processors/standard/TestNotify.java    | 128 ++++++--
 .../processors/standard/TestSegmentContent.java |  10 +
 .../processors/standard/TestSplitContent.java   |  21 ++
 .../nifi/processors/standard/TestSplitJson.java |  24 +-
 .../nifi/processors/standard/TestSplitText.java |  25 +-
 .../nifi/processors/standard/TestSplitXml.java  |  17 +-
 .../processors/standard/TestUnpackContent.java  |  26 ++
 .../nifi/processors/standard/TestWait.java      | 311 ++++++++++++++-----
 .../standard/TestWaitNotifyProtocol.java        | 245 +++++++++++++++
 25 files changed, 1214 insertions(+), 250 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FragmentAttributes.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FragmentAttributes.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FragmentAttributes.java
new file mode 100644
index 0000000..6f055da
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FragmentAttributes.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowfile.attributes;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This enum class contains flow file attribute keys commonly used among Split processors.
+ */
+public enum FragmentAttributes implements FlowFileAttributeKey {
+
+    /**
+     * The number of bytes from the original FlowFile that were copied to this FlowFile,
+     * including header, if applicable, which is duplicated in each split FlowFile.
+     */
+    FRAGMENT_SIZE("fragment.size"),
+    /**
+     * All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute.
+     */
+    FRAGMENT_ID("fragment.identifier"),
+    /**
+     * A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile.
+     */
+    FRAGMENT_INDEX("fragment.index"),
+    /**
+     * The number of split FlowFiles generated from the parent FlowFile.
+     */
+    FRAGMENT_COUNT("fragment.count"),
+    /**
+     * The filename of the parent FlowFile.
+     */
+    SEGMENT_ORIGINAL_FILENAME("segment.original.filename");
+
+    private final String key;
+
+    FragmentAttributes(final String key) {
+        this.key = key;
+    }
+
+    @Override
+    public String key() {
+        return key;
+    }
+
+    public static FlowFile copyAttributesToOriginal(final ProcessSession processSession, final FlowFile originalFlowFile,
+                                                    final String fragmentId, final int fragmentCount) {
+        final Map<String, String> attributesToOriginal = new HashMap<>();
+        if (fragmentId != null && fragmentId.length() > 0) {
+            attributesToOriginal.put(FRAGMENT_ID.key(), fragmentId);
+        }
+        attributesToOriginal.put(FRAGMENT_COUNT.key(), String.valueOf(fragmentCount));
+        return processSession.putAllAttributes(originalFlowFile, attributesToOriginal);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
index 83964fa..3a28917 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
@@ -64,6 +64,12 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.stream.io.BufferedOutputStream;
 
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal;
+
 @SideEffectFree
 @SupportsBatching
 @Tags({ "avro", "split" })
@@ -217,13 +223,14 @@ public class SplitAvro extends AbstractProcessor {
             final String fragmentIdentifier = UUID.randomUUID().toString();
             IntStream.range(0, splits.size()).forEach((i) -> {
                 FlowFile split = splits.get(i);
-                split = session.putAttribute(split, "fragment.identifier", fragmentIdentifier);
-                split = session.putAttribute(split, "fragment.index", Integer.toString(i));
-                split = session.putAttribute(split, "segment.original.filename", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
-                split = session.putAttribute(split, "fragment.count", Integer.toString(splits.size()));
+                split = session.putAttribute(split, FRAGMENT_ID.key(), fragmentIdentifier);
+                split = session.putAttribute(split, FRAGMENT_INDEX.key(), Integer.toString(i));
+                split = session.putAttribute(split, SEGMENT_ORIGINAL_FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+                split = session.putAttribute(split, FRAGMENT_COUNT.key(), Integer.toString(splits.size()));
                 session.transfer(split, REL_SPLIT);
             });
-            session.transfer(flowFile, REL_ORIGINAL);
+            final FlowFile originalFlowFile = copyAttributesToOriginal(session, flowFile, fragmentIdentifier, splits.size());
+            session.transfer(originalFlowFile, REL_ORIGINAL);
         } catch (ProcessException e) {
             getLogger().error("Failed to split {} due to {}", new Object[]{flowFile, e.getMessage()}, e);
             session.transfer(flowFile, REL_FAILURE);

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java
index 32d43e3..c17842d 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java
@@ -44,6 +44,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.stream.IntStream;
 
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
 import static org.junit.Assert.assertEquals;
 
 public class TestSplitAvro {
@@ -116,6 +118,9 @@ public class TestSplitAvro {
         runner.assertTransferCount(SplitAvro.REL_SPLIT, 100);
         runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
         runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
+        final MockFlowFile originalFlowFile = runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0);
+        originalFlowFile.assertAttributeExists(FRAGMENT_ID.key());
+        originalFlowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "100");
         final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
         checkDataFileSplitSize(flowFiles, 1, true);
         final String fragmentIdentifier = flowFiles.get(0).getAttribute("fragment.identifier");
@@ -123,7 +128,7 @@ public class TestSplitAvro {
             MockFlowFile flowFile = flowFiles.get(i);
             assertEquals(i, Integer.parseInt(flowFile.getAttribute("fragment.index")));
             assertEquals(fragmentIdentifier, flowFile.getAttribute("fragment.identifier"));
-            assertEquals(flowFiles.size(), Integer.parseInt(flowFile.getAttribute("fragment.count")));
+            assertEquals(flowFiles.size(), Integer.parseInt(flowFile.getAttribute(FRAGMENT_COUNT.key())));
             assertEquals(filename, flowFile.getAttribute("segment.original.filename"));
         });
     }
@@ -140,6 +145,7 @@ public class TestSplitAvro {
         runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
         runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
 
+        runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5");
         final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
         checkDataFileSplitSize(flowFiles, 20, true);
     }
@@ -156,6 +162,7 @@ public class TestSplitAvro {
         runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
         runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
 
+        runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
         final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
         checkDataFileSplitSize(flowFiles, 100, true);
     }
@@ -172,6 +179,7 @@ public class TestSplitAvro {
         runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
         runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
 
+        runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "100");
         final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
         checkDataFileSplitSize(flowFiles, 1, false);
 
@@ -197,6 +205,7 @@ public class TestSplitAvro {
         runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
         runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
 
+        runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "100");
         final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
 
         checkBareRecordsSplitSize(flowFiles, 1, true);
@@ -215,6 +224,7 @@ public class TestSplitAvro {
         runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
         runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
 
+        runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5");
         final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
 
         checkBareRecordsSplitSize(flowFiles, 20, true);
@@ -234,6 +244,7 @@ public class TestSplitAvro {
         runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
         runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
 
+        runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5");
         final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
 
         checkBareRecordsSplitSize(flowFiles, 20, false);

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
index 2db7bcc..1ce6fb5 100755
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@ -65,6 +65,11 @@ import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.node.ArrayNode;
 import org.codehaus.jackson.node.JsonNodeFactory;
 
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal;
+
 @SideEffectFree
 @SupportsBatching
 @SeeAlso(PutSQL.class)
@@ -369,9 +374,9 @@ public class ConvertJSONToSQL extends AbstractProcessor {
 
             attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
             attributes.put("sql.table", tableName);
-            attributes.put("fragment.identifier", fragmentIdentifier);
-            attributes.put("fragment.count", String.valueOf(arrayNode.size()));
-            attributes.put("fragment.index", String.valueOf(i));
+            attributes.put(FRAGMENT_ID.key(), fragmentIdentifier);
+            attributes.put(FRAGMENT_COUNT.key(), String.valueOf(arrayNode.size()));
+            attributes.put(FRAGMENT_INDEX.key(), String.valueOf(i));
 
             if (catalog != null) {
                 attributes.put("sql.catalog", catalog);
@@ -381,6 +386,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
             session.transfer(sqlFlowFile, REL_SQL);
         }
 
+        flowFile = copyAttributesToOriginal(session, flowFile, fragmentIdentifier, arrayNode.size());
         session.transfer(flowFile, REL_ORIGINAL);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 16bb911..f18416e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -67,6 +67,7 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
@@ -127,15 +128,15 @@ import org.apache.nifi.util.FlowFilePackagerV3;
 public class MergeContent extends BinFiles {
 
     // preferred attributes
-    public static final String FRAGMENT_ID_ATTRIBUTE = "fragment.identifier";
-    public static final String FRAGMENT_INDEX_ATTRIBUTE = "fragment.index";
-    public static final String FRAGMENT_COUNT_ATTRIBUTE = "fragment.count";
+    public static final String FRAGMENT_ID_ATTRIBUTE = FragmentAttributes.FRAGMENT_ID.key();
+    public static final String FRAGMENT_INDEX_ATTRIBUTE = FragmentAttributes.FRAGMENT_INDEX.key();
+    public static final String FRAGMENT_COUNT_ATTRIBUTE = FragmentAttributes.FRAGMENT_COUNT.key();
 
     // old style attributes
     public static final String SEGMENT_ID_ATTRIBUTE = "segment.identifier";
     public static final String SEGMENT_INDEX_ATTRIBUTE = "segment.index";
     public static final String SEGMENT_COUNT_ATTRIBUTE = "segment.count";
-    public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
+    public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
 
     public static final AllowableValue MERGE_STRATEGY_BIN_PACK = new AllowableValue(
             "Bin-Packing Algorithm",

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
index 32cd249..9d809ef 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
@@ -17,8 +17,6 @@
 package org.apache.nifi.processors.standard;
 
 import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -37,9 +35,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
-import org.apache.nifi.distributed.cache.client.Serializer;
-import org.apache.nifi.distributed.cache.client.exception.SerializationException;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
 import org.apache.nifi.expression.AttributeExpression.ResultType;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
@@ -49,7 +45,6 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
 
 @EventDriven
 @SupportsBatching
@@ -67,7 +62,7 @@ public class Notify extends AbstractProcessor {
         .name("Distributed Cache Service")
         .description("The Controller Service that is used to cache release signals in order to release files queued at a corresponding Wait processor")
         .required(true)
-        .identifiesControllerService(DistributedMapCacheClient.class)
+        .identifiesControllerService(AtomicDistributedMapCacheClient.class)
         .build();
 
     // Selects the FlowFile attribute or expression, whose value is used as cache key
@@ -80,6 +75,18 @@ public class Notify extends AbstractProcessor {
         .expressionLanguageSupported(true)
         .build();
 
+    public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder()
+        .name("Signal Counter Name")
+        .description("A value, or the results of an Attribute Expression Language statement, which will " +
+            "be evaluated against a FlowFile in order to determine the signal counter name. " +
+            "Signal counter name is useful when a corresponding Wait processor needs to know the number of occurrences " +
+            "of different types of events, such as success or failure, or destination data source names, etc.")
+        .required(true)
+        .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
+        .expressionLanguageSupported(true)
+        .defaultValue(WaitNotifyProtocol.DEFAULT_COUNT_NAME)
+        .build();
+
     // Specifies an optional regex used to identify which attributes to cache
     public static final PropertyDescriptor ATTRIBUTE_CACHE_REGEX = new PropertyDescriptor.Builder()
         .name("Attribute Cache Regex")
@@ -103,9 +110,6 @@ public class Notify extends AbstractProcessor {
 
     private final Set<Relationship> relationships;
 
-    private final Serializer<String> keySerializer = new StringSerializer();
-    private final Serializer<Map<String, String>> valueSerializer = new FlowFileAttributesSerializer();
-
     public Notify() {
         final Set<Relationship> rels = new HashSet<>();
         rels.add(REL_SUCCESS);
@@ -117,6 +121,7 @@ public class Notify extends AbstractProcessor {
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.add(RELEASE_SIGNAL_IDENTIFIER);
+        descriptors.add(SIGNAL_COUNTER_NAME);
         descriptors.add(DISTRIBUTED_CACHE_SERVICE);
         descriptors.add(ATTRIBUTE_CACHE_REGEX);
         return descriptors;
@@ -137,11 +142,12 @@ public class Notify extends AbstractProcessor {
 
         final ComponentLog logger = getLogger();
 
-        // cache key is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
-        final String cacheKey = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
+        // Signal id is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
+        final String signalId = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
+        final String counterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue();
 
         // if the computed value is null, or empty, we transfer the flow file to failure relationship
-        if (StringUtils.isBlank(cacheKey)) {
+        if (StringUtils.isBlank(signalId)) {
             logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile});
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
@@ -149,7 +155,8 @@ public class Notify extends AbstractProcessor {
         }
 
         // the cache client used to interact with the distributed cache
-        final DistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
+        final AtomicDistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class);
+        final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
 
         try {
             final String attributeCacheRegex = (context.getProperty(ATTRIBUTE_CACHE_REGEX).isSet())
@@ -164,10 +171,12 @@ public class Notify extends AbstractProcessor {
             }
 
             if (logger.isDebugEnabled()) {
-                logger.debug("Cached release signal identifier {} from FlowFile {}", new Object[] {cacheKey, flowFile});
+                logger.debug("Cached release signal identifier {} counterName {} from FlowFile {}", new Object[] {signalId, counterName, flowFile});
             }
 
-            cache.put(cacheKey, attributesToCache, keySerializer, valueSerializer);
+            // In case of ConcurrentModificationException, just throw the exception so that processor can
+            // retry after yielding for a while.
+            protocol.notify(signalId, counterName, 1, attributesToCache);
 
             session.transfer(flowFile, REL_SUCCESS);
         } catch (final IOException e) {
@@ -177,15 +186,4 @@ public class Notify extends AbstractProcessor {
         }
     }
 
-    /**
-     * Simple string serializer, used for serializing the cache key
-     */
-    public static class StringSerializer implements Serializer<String> {
-
-        @Override
-        public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
-            out.write(value.getBytes(StandardCharsets.UTF_8));
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index adfac05..76901fe 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -29,6 +29,7 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.ProcessContext;
@@ -161,9 +162,9 @@ public class PutSQL extends AbstractProcessor {
     private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type");
     private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
 
-    private static final String FRAGMENT_ID_ATTR = "fragment.identifier";
-    private static final String FRAGMENT_INDEX_ATTR = "fragment.index";
-    private static final String FRAGMENT_COUNT_ATTR = "fragment.count";
+    private static final String FRAGMENT_ID_ATTR = FragmentAttributes.FRAGMENT_ID.key();
+    private static final String FRAGMENT_INDEX_ATTR = FragmentAttributes.FRAGMENT_INDEX.key();
+    private static final String FRAGMENT_COUNT_ATTR = FragmentAttributes.FRAGMENT_COUNT.key();
 
     private static final Pattern LONG_PATTERN = Pattern.compile("^\\d{1,19}$");
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
index 4e25d3c..1fc1feb 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
@@ -38,6 +38,7 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
@@ -80,11 +81,11 @@ public class SegmentContent extends AbstractProcessor {
     public static final String SEGMENT_ID = "segment.identifier";
     public static final String SEGMENT_INDEX = "segment.index";
     public static final String SEGMENT_COUNT = "segment.count";
-    public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
+    public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
 
-    public static final String FRAGMENT_ID = "fragment.identifier";
-    public static final String FRAGMENT_INDEX = "fragment.index";
-    public static final String FRAGMENT_COUNT = "fragment.count";
+    public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
+    public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
+    public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
 
     public static final PropertyDescriptor SIZE = new PropertyDescriptor.Builder()
             .name("Segment Size")
@@ -180,6 +181,7 @@ public class SegmentContent extends AbstractProcessor {
         }
 
         session.transfer(segmentSet, REL_SEGMENTS);
+        flowFile = FragmentAttributes.copyAttributesToOriginal(session, flowFile, segmentId, totalSegments);
         session.transfer(flowFile, REL_ORIGINAL);
 
         if (totalSegments <= 10) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
index 52b124e..d20fe8c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
@@ -50,6 +50,7 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -77,10 +78,10 @@ import org.apache.nifi.util.Tuple;
 public class SplitContent extends AbstractProcessor {
 
     // attribute keys
-    public static final String FRAGMENT_ID = "fragment.identifier";
-    public static final String FRAGMENT_INDEX = "fragment.index";
-    public static final String FRAGMENT_COUNT = "fragment.count";
-    public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
+    public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
+    public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
+    public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
+    public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
 
     static final AllowableValue HEX_FORMAT = new AllowableValue("Hexadecimal", "Hexadecimal", "The Byte Sequence will be interpreted as a hexadecimal representation of bytes");
     static final AllowableValue UTF8_FORMAT = new AllowableValue("Text", "Text", "The Byte Sequence will be interpreted as UTF-8 Encoded text");
@@ -182,7 +183,7 @@ public class SplitContent extends AbstractProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) {
-        final FlowFile flowFile = session.get();
+        FlowFile flowFile = session.get();
         if (flowFile == null) {
             return;
         }
@@ -285,8 +286,9 @@ public class SplitContent extends AbstractProcessor {
             splitList.add(finalSplit);
         }
 
-        finishFragmentAttributes(session, flowFile, splitList);
+        final String fragmentId = finishFragmentAttributes(session, flowFile, splitList);
         session.transfer(splitList, REL_SPLITS);
+        flowFile = FragmentAttributes.copyAttributesToOriginal(session, flowFile, fragmentId, splitList.size());
         session.transfer(flowFile, REL_ORIGINAL);
 
         if (splitList.size() > 10) {
@@ -302,8 +304,9 @@ public class SplitContent extends AbstractProcessor {
      * @param session session
      * @param source source
      * @param splits splits
+     * @return generated fragment identifier for the splits
      */
-    private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) {
+    private String finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) {
         final String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key());
 
         final String fragmentId = UUID.randomUUID().toString();
@@ -319,6 +322,7 @@ public class SplitContent extends AbstractProcessor {
             FlowFile newFF = session.putAllAttributes(ff, attributes);
             splits.add(newFF);
         }
+        return fragmentId;
     }
 
     static class HexStringPropertyValidator implements Validator {

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
index 003834e..e7df459 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
@@ -56,6 +56,12 @@ import com.jayway.jsonpath.InvalidJsonException;
 import com.jayway.jsonpath.JsonPath;
 import com.jayway.jsonpath.PathNotFoundException;
 
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal;
+
 @EventDriven
 @SideEffectFree
 @SupportsBatching
@@ -166,7 +172,7 @@ public class SplitJson extends AbstractJsonPathProcessor {
 
     @Override
     public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) {
-        final FlowFile original = processSession.get();
+        FlowFile original = processSession.get();
         if (original == null) {
             return;
         }
@@ -202,8 +208,9 @@ public class SplitJson extends AbstractJsonPathProcessor {
         List resultList = (List) jsonPathResult;
 
         Map<String, String> attributes = new HashMap<>();
-        attributes.put("fragment.identifier", UUID.randomUUID().toString());
-        attributes.put("fragment.count", Integer.toString(resultList.size()));
+        final String fragmentId = UUID.randomUUID().toString();
+        attributes.put(FRAGMENT_ID.key(), fragmentId);
+        attributes.put(FRAGMENT_COUNT.key(), Integer.toString(resultList.size()));
 
         for (int i = 0; i < resultList.size(); i++) {
             Object resultSegment = resultList.get(i);
@@ -213,11 +220,12 @@ public class SplitJson extends AbstractJsonPathProcessor {
                         out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8));
                     }
             );
-            attributes.put("segment.original.filename", split.getAttribute(CoreAttributes.FILENAME.key()));
-            attributes.put("fragment.index", Integer.toString(i));
+            attributes.put(SEGMENT_ORIGINAL_FILENAME.key(), split.getAttribute(CoreAttributes.FILENAME.key()));
+            attributes.put(FRAGMENT_INDEX.key(), Integer.toString(i));
             processSession.transfer(processSession.putAllAttributes(split, attributes), REL_SPLIT);
         }
 
+        original = copyAttributesToOriginal(processSession, original, fragmentId, resultList.size());
         processSession.transfer(original, REL_ORIGINAL);
         logger.info("Split {} into {} FlowFiles", new Object[]{original, resultList.size()});
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
index ddb770d..e57841f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
@@ -48,6 +48,7 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
@@ -86,11 +87,11 @@ import org.apache.nifi.stream.io.util.TextLineDemarcator.OffsetInfo;
 public class SplitText extends AbstractProcessor {
     // attribute keys
     public static final String SPLIT_LINE_COUNT = "text.line.count";
-    public static final String FRAGMENT_SIZE = "fragment.size";
-    public static final String FRAGMENT_ID = "fragment.identifier";
-    public static final String FRAGMENT_INDEX = "fragment.index";
-    public static final String FRAGMENT_COUNT = "fragment.count";
-    public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
+    public static final String FRAGMENT_SIZE = FragmentAttributes.FRAGMENT_SIZE.key();
+    public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
+    public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
+    public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
+    public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
 
     public static final PropertyDescriptor LINE_SPLIT_COUNT = new PropertyDescriptor.Builder()
             .name("Line Split Count")
@@ -250,8 +251,10 @@ public class SplitText extends AbstractProcessor {
         if (error.get()){
             processSession.transfer(sourceFlowFile, REL_FAILURE);
         } else {
-            List<FlowFile> splitFlowFiles = this.generateSplitFlowFiles(sourceFlowFile, headerSplitInfoRef.get(), computedSplitsInfo, processSession);
-            processSession.transfer(sourceFlowFile, REL_ORIGINAL);
+            final String fragmentId = UUID.randomUUID().toString();
+            List<FlowFile> splitFlowFiles = this.generateSplitFlowFiles(fragmentId, sourceFlowFile, headerSplitInfoRef.get(), computedSplitsInfo, processSession);
+            final FlowFile originalFlowFile = FragmentAttributes.copyAttributesToOriginal(processSession, sourceFlowFile, fragmentId, splitFlowFiles.size());
+            processSession.transfer(originalFlowFile, REL_ORIGINAL);
             if (!splitFlowFiles.isEmpty()) {
                 processSession.transfer(splitFlowFiles, REL_SPLITS);
             }
@@ -279,7 +282,8 @@ public class SplitText extends AbstractProcessor {
      * it signifies the header information and its contents will be included in
      * each and every computed split.
      */
-    private List<FlowFile> generateSplitFlowFiles(FlowFile sourceFlowFile, SplitInfo splitInfo, List<SplitInfo> computedSplitsInfo, ProcessSession processSession){
+    private List<FlowFile> generateSplitFlowFiles(final String fragmentId, FlowFile sourceFlowFile, SplitInfo splitInfo,
+                                                  List<SplitInfo> computedSplitsInfo, ProcessSession processSession){
         List<FlowFile> splitFlowFiles = new ArrayList<>();
         FlowFile headerFlowFile = null;
         long headerCrlfLength = 0;
@@ -288,7 +292,6 @@ public class SplitText extends AbstractProcessor {
             headerCrlfLength = splitInfo.trimmedLength;
         }
         int fragmentIndex = 1; // set to 1 to preserve the existing behavior *only*. Perhaps should be deprecated to follow the 0,1,2... scheme
-        String fragmentId = UUID.randomUUID().toString();
 
         if ((computedSplitsInfo.size() == 0) && (headerFlowFile != null)) {
             FlowFile splitFlowFile = processSession.clone(sourceFlowFile, 0, headerFlowFile.getSize() - headerCrlfLength);

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java
index 0f0032a..502f7f3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java
@@ -64,6 +64,12 @@ import org.xml.sax.Locator;
 import org.xml.sax.SAXException;
 import org.xml.sax.XMLReader;
 
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal;
+
 @EventDriven
 @SideEffectFree
 @SupportsBatching
@@ -161,9 +167,9 @@ public class SplitXml extends AbstractProcessor {
         final XmlSplitterSaxParser parser = new XmlSplitterSaxParser(xmlTree -> {
             FlowFile split = session.create(original);
             split = session.write(split, out -> out.write(xmlTree.getBytes("UTF-8")));
-            split = session.putAttribute(split, "fragment.identifier", fragmentIdentifier);
-            split = session.putAttribute(split, "fragment.index", Integer.toString(numberOfRecords.getAndIncrement()));
-            split = session.putAttribute(split, "segment.original.filename", split.getAttribute(CoreAttributes.FILENAME.key()));
+            split = session.putAttribute(split, FRAGMENT_ID.key(), fragmentIdentifier);
+            split = session.putAttribute(split, FRAGMENT_INDEX.key(), Integer.toString(numberOfRecords.getAndIncrement()));
+            split = session.putAttribute(split, SEGMENT_ORIGINAL_FILENAME.key(), split.getAttribute(CoreAttributes.FILENAME.key()));
             splits.add(split);
         }, depth);
 
@@ -188,12 +194,13 @@ public class SplitXml extends AbstractProcessor {
             session.remove(splits);
         } else {
             splits.forEach((split) -> {
-                split = session.putAttribute(split, "fragment.count", Integer.toString(numberOfRecords.get()));
+                split = session.putAttribute(split, FRAGMENT_COUNT.key(), Integer.toString(numberOfRecords.get()));
                 session.transfer(split, REL_SPLIT);
             });
 
-            session.transfer(original, REL_ORIGINAL);
-            logger.info("Split {} into {} FlowFiles", new Object[]{original, splits.size()});
+            final FlowFile originalToTransfer = copyAttributesToOriginal(session, original, fragmentIdentifier, numberOfRecords.get());
+            session.transfer(originalToTransfer, REL_ORIGINAL);
+            logger.info("Split {} into {} FlowFiles", new Object[]{originalToTransfer, splits.size()});
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
index 0437ed1..18015fa 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
@@ -53,6 +53,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -97,10 +98,10 @@ import org.apache.nifi.util.FlowFileUnpackagerV3;
 @SeeAlso(MergeContent.class)
 public class UnpackContent extends AbstractProcessor {
     // attribute keys
-    public static final String FRAGMENT_ID = "fragment.identifier";
-    public static final String FRAGMENT_INDEX = "fragment.index";
-    public static final String FRAGMENT_COUNT = "fragment.count";
-    public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
+    public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
+    public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
+    public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
+    public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
 
     public static final String AUTO_DETECT_FORMAT_NAME = "use mime.type attribute";
     public static final String TAR_FORMAT_NAME = "tar";
@@ -262,6 +263,8 @@ public class UnpackContent extends AbstractProcessor {
                 finishFragmentAttributes(session, flowFile, unpacked);
             }
             session.transfer(unpacked, REL_SUCCESS);
+            final String fragmentId = unpacked.size() > 0 ? unpacked.get(0).getAttribute(FRAGMENT_ID) : null;
+            flowFile = FragmentAttributes.copyAttributesToOriginal(session, flowFile, fragmentId, unpacked.size());
             session.transfer(flowFile, REL_ORIGINAL);
             session.getProvenanceReporter().fork(flowFile, unpacked);
             logger.info("Unpacked {} into {} and transferred to success", new Object[]{flowFile, unpacked});

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
index 4a4f803..0bd5ca6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
@@ -17,8 +17,6 @@
 package org.apache.nifi.processors.standard;
 
 import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -28,6 +26,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.EventDriven;
@@ -35,15 +34,13 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.distributed.cache.client.Deserializer;
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
-import org.apache.nifi.distributed.cache.client.Serializer;
-import org.apache.nifi.distributed.cache.client.exception.SerializationException;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
 import org.apache.nifi.expression.AttributeExpression.ResultType;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
@@ -53,19 +50,30 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
+import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal;
 
 @EventDriven
 @SupportsBatching
 @Tags({"map", "cache", "wait", "hold", "distributed", "signal", "release"})
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Routes incoming FlowFiles to the 'wait' relationship until a matching release signal "
-        + "is stored in the distributed cache from a corresponding Notify processor.  At this point, a waiting FlowFile is routed to "
-        + "the 'success' relationship, with attributes copied from the FlowFile that produced "
-        + "the release signal from the Notify processor.  The release signal entry is then removed from "
-        + "the cache.  Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration.")
-@WritesAttribute(attribute = "wait.start.timestamp", description = "All FlowFiles will have an attribute 'wait.start.timestamp', which sets the "
-        + "initial epoch timestamp when the file first entered this processor.  This is used to determine the expiration time of the FlowFile.")
+        + "is stored in the distributed cache from a corresponding Notify processor. "
+        + "When a matching release signal is identified, a waiting FlowFile is routed to the 'success' relationship, "
+        + "with attributes copied from the FlowFile that produced the release signal from the Notify processor.  "
+        + "The release signal entry is then removed from the cache. Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration. "
+
+        + "If you need to wait for more than one signal, specify the desired number of signals via the 'Target Signal Count' property. "
+        + "This is particularly useful with processors that split a source flow file into multiple fragments, such as SplitText. "
+        + "In order to wait for all fragments to be processed, connect the 'original' relationship to a Wait processor, and the 'splits' relationship to "
+        + "a corresponding Notify processor. Configure the Notify and Wait processors to use the '${fragment.identifier}' as the value "
+        + "of 'Release Signal Identifier', and specify '${fragment.count}' as the value of 'Target Signal Count' in the Wait processor."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = "wait.start.timestamp", description = "All FlowFiles will have an attribute 'wait.start.timestamp', which sets the "
+        + "initial epoch timestamp when the file first entered this processor.  This is used to determine the expiration time of the FlowFile."),
+        @WritesAttribute(attribute = "wait.counter.<counterName>", description = "If a signal exists when the processor runs, "
+        + "each count value in the signal is copied.")
+})
 @SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
         "org.apache.nifi.processors.standard.Notify"})
 public class Wait extends AbstractProcessor {
@@ -77,7 +85,7 @@ public class Wait extends AbstractProcessor {
         .name("Distributed Cache Service")
         .description("The Controller Service that is used to check for release signals from a corresponding Notify processor")
         .required(true)
-        .identifiesControllerService(DistributedMapCacheClient.class)
+        .identifiesControllerService(AtomicDistributedMapCacheClient.class)
         .build();
 
     // Selects the FlowFile attribute or expression, whose value is used as cache key
@@ -90,6 +98,29 @@ public class Wait extends AbstractProcessor {
         .expressionLanguageSupported(true)
         .build();
 
+    public static final PropertyDescriptor TARGET_SIGNAL_COUNT = new PropertyDescriptor.Builder()
+            .name("Target Signal Count")
+            .description("A value, or the results of an Attribute Expression Language statement, which will " +
+                    "be evaluated against a FlowFile in order to determine the target signal count. " +
+                    "This processor checks whether the signal count has reached this number. " +
+                    "If Signal Counter Name is specified, this processor checks a particular counter, " +
+                    "otherwise checks against total count in a signal.")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .defaultValue("1")
+            .build();
+
+    public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder()
+            .name("Signal Counter Name")
+            .description("A value, or the results of an Attribute Expression Language statement, which will " +
+                    "be evaluated against a FlowFile in order to determine the signal counter name. " +
+                    "If not specified, this processor checks the total count in a signal.")
+            .required(false)
+            .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
+            .expressionLanguageSupported(true)
+            .build();
+
     // Selects the FlowFile attribute or expression, whose value is used as cache key
     public static final PropertyDescriptor EXPIRATION_DURATION = new PropertyDescriptor.Builder()
         .name("Expiration Duration")
@@ -136,9 +167,6 @@ public class Wait extends AbstractProcessor {
         .build();
     private final Set<Relationship> relationships;
 
-    private final Serializer<String> keySerializer = new StringSerializer();
-    private final Deserializer<Map<String, String>> valueDeserializer = new FlowFileAttributesSerializer();
-
     public Wait() {
         final Set<Relationship> rels = new HashSet<>();
         rels.add(REL_SUCCESS);
@@ -152,6 +180,8 @@ public class Wait extends AbstractProcessor {
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.add(RELEASE_SIGNAL_IDENTIFIER);
+        descriptors.add(TARGET_SIGNAL_COUNT);
+        descriptors.add(SIGNAL_COUNTER_NAME);
         descriptors.add(EXPIRATION_DURATION);
         descriptors.add(DISTRIBUTED_CACHE_SERVICE);
         descriptors.add(ATTRIBUTE_COPY_MODE);
@@ -173,11 +203,11 @@ public class Wait extends AbstractProcessor {
 
         final ComponentLog logger = getLogger();
 
-        // cache key is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
-        final String cacheKey = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
+        // Signal id is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
+        final String signalId = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
 
         // if the computed value is null, or empty, we transfer the flow file to failure relationship
-        if (StringUtils.isBlank(cacheKey)) {
+        if (StringUtils.isBlank(signalId)) {
             logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile});
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
@@ -185,9 +215,17 @@ public class Wait extends AbstractProcessor {
         }
 
         // the cache client used to interact with the distributed cache
-        final DistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
+        final AtomicDistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class);
+        final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
+
+        String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue();
+        final boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode);
 
+        Signal signal = null;
         try {
+            // get notifying signal
+            signal = protocol.getSignal(signalId);
+
             // check for expiration
             String waitStartTimestamp = flowFile.getAttribute(WAIT_START_TIMESTAMP);
             if (waitStartTimestamp == null) {
@@ -201,6 +239,8 @@ public class Wait extends AbstractProcessor {
             } catch (NumberFormatException nfe) {
                 logger.error("{} has an invalid value '{}' on FlowFile {}", new Object[] {WAIT_START_TIMESTAMP, waitStartTimestamp, flowFile});
                 flowFile = session.penalize(flowFile);
+
+                flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
                 session.transfer(flowFile, REL_FAILURE);
                 return;
             }
@@ -209,58 +249,83 @@ public class Wait extends AbstractProcessor {
             long now = System.currentTimeMillis();
             if (now > (lWaitStartTimestamp + expirationDuration)) {
                 logger.warn("FlowFile {} expired after {}ms", new Object[] {flowFile, (now - lWaitStartTimestamp)});
+                flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
                 session.transfer(flowFile, REL_EXPIRED);
                 return;
             }
 
-            // get notifying flow file attributes
-            Map<String, String> cachedAttributes = cache.get(cacheKey, keySerializer, valueDeserializer);
-
-            if (cachedAttributes == null) {
+            if (signal == null) {
+                // If there's no signal yet, then we don't have to evaluate target counts. Return immediately.
                 if (logger.isDebugEnabled()) {
-                    logger.debug("No release signal yet for {} on FlowFile {}", new Object[] {cacheKey, flowFile});
+                    logger.debug("No release signal yet for {} on FlowFile {}", new Object[] {signalId, flowFile});
                 }
                 session.transfer(flowFile, REL_WAIT);
                 return;
             }
 
-            // copy over attributes from release signal flow file, if provided
-            if (!cachedAttributes.isEmpty()) {
-                cachedAttributes.remove("uuid");
-                String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue();
-                if (ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode)) {
-                    flowFile = session.putAllAttributes(flowFile, cachedAttributes);
-                } else {
-                    Map<String, String> attributesToCopy = new HashMap<>();
-                    for(Entry<String, String> entry : cachedAttributes.entrySet()) {
-                        // if the current flow file does *not* have the cached attribute, copy it
-                        if (flowFile.getAttribute(entry.getKey()) == null) {
-                            attributesToCopy.put(entry.getKey(), entry.getValue());
-                        }
-                    }
-                    flowFile = session.putAllAttributes(flowFile, attributesToCopy);
+            final String targetCounterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue();
+            final Long targetCount = Long.valueOf(context.getProperty(TARGET_SIGNAL_COUNT).evaluateAttributeExpressions(flowFile).getValue());
+            final boolean reachedToTargetCount = StringUtils.isBlank(targetCounterName)
+                    ? signal.isTotalCountReached(targetCount)
+                    : signal.isCountReached(targetCounterName, targetCount);
+
+            if (!reachedToTargetCount) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Release signal count {} hasn't reached {} for {} on FlowFile {}",
+                            new Object[] {targetCounterName, targetCount, signalId, flowFile});
                 }
+                flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
+                session.transfer(flowFile, REL_WAIT);
+                return;
             }
 
+
+            flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
             session.transfer(flowFile, REL_SUCCESS);
 
-            cache.remove(cacheKey, keySerializer);
+            protocol.complete(signalId);
+
+        } catch (final NumberFormatException e) {
+            flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            logger.error("Failed to parse targetCount when processing {} due to {}", new Object[] {flowFile, e});
+
         } catch (final IOException e) {
+            flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
             logger.error("Unable to communicate with cache when processing {} due to {}", new Object[] {flowFile, e});
         }
     }
 
-    /**
-     * Simple string serializer, used for serializing the cache key
-     */
-    public static class StringSerializer implements Serializer<String> {
+    private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final boolean replaceOriginal) {
+        if (signal == null) {
+            return flowFile;
+        }
 
-        @Override
-        public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
-            out.write(value.getBytes(StandardCharsets.UTF_8));
+        // copy over attributes from release signal flow file, if provided
+        final Map<String, String> attributesToCopy;
+        if (replaceOriginal) {
+            attributesToCopy = new HashMap<>(signal.getAttributes());
+            attributesToCopy.remove("uuid");
+        } else {
+            // if the current flow file does *not* have the cached attribute, copy it
+            attributesToCopy = signal.getAttributes().entrySet().stream()
+                    .filter(e -> flowFile.getAttribute(e.getKey()) == null)
+                    .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
         }
+
+        // Copy counter attributes
+        final Map<String, Long> counts = signal.getCounts();
+        final long totalCount = counts.entrySet().stream().mapToLong(e -> {
+            final Long count = e.getValue();
+            attributesToCopy.put("wait.counter." + e.getKey(), String.valueOf(count));
+            return count;
+        }).sum();
+        attributesToCopy.put("wait.counter.total", String.valueOf(totalCount));
+
+        return session.putAllAttributes(flowFile, attributesToCopy);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
new file mode 100644
index 0000000..a74590a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient.CacheEntry;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class provide a protocol for Wait and Notify processors to work together.
+ * Once AtomicDistributedMapCacheClient is passed to this protocol, components that wish to join the notification mechanism
+ * should only use methods provided by this protocol, instead of calling cache API directly.
+ */
+public class WaitNotifyProtocol {
+
+    private static final Logger logger = LoggerFactory.getLogger(WaitNotifyProtocol.class);
+
+    public static final String DEFAULT_COUNT_NAME = "default";
+    private static final int MAX_REPLACE_RETRY_COUNT = 5;
+    private static final int REPLACE_RETRY_WAIT_MILLIS = 10;
+
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+    private static final Serializer<String> stringSerializer = (value, output) -> output.write(value.getBytes(StandardCharsets.UTF_8));
+    private final Deserializer<String> stringDeserializer = input -> new String(input, StandardCharsets.UTF_8);
+
+    public static class Signal {
+        private Map<String, Long> counts = new HashMap<>();
+        private Map<String, String> attributes = new HashMap<>();
+
+        public Map<String, Long> getCounts() {
+            return counts;
+        }
+
+        public void setCounts(Map<String, Long> counts) {
+            this.counts = counts;
+        }
+
+        public Map<String, String> getAttributes() {
+            return attributes;
+        }
+
+        public void setAttributes(Map<String, String> attributes) {
+            this.attributes = attributes;
+        }
+
+        public boolean isTotalCountReached(final long targetCount) {
+            final long totalCount = counts.values().stream().mapToLong(Long::longValue).sum();
+            return totalCount >= targetCount;
+        }
+
+        public boolean isCountReached(final String counterName, final long targetCount) {
+            return getCount(counterName) >= targetCount;
+        }
+
+        public long getCount(final String counterName) {
+            final Long count = counts.get(counterName);
+            return count != null ? count : -1;
+        }
+
+    }
+
+    private final AtomicDistributedMapCacheClient cache;
+
+    public WaitNotifyProtocol(final AtomicDistributedMapCacheClient cache) {
+        this.cache = cache;
+    }
+
+    /**
+     * Notify a signal to increase a counter.
+     * @param signalId a key in the underlying cache engine
+     * @param counterName specify count to update
+     * @param delta delta to update a counter
+     * @param attributes attributes to save in the cache entry
+     * @return A Signal instance, merged with an existing signal if any
+     * @throws IOException thrown when it failed interacting with the cache engine
+     * @throws ConcurrentModificationException thrown if other process is also updating the same signal and failed to update after few retry attempts
+     */
+    public Signal notify(final String signalId, final String counterName, final int delta, final Map<String, String> attributes)
+            throws IOException, ConcurrentModificationException {
+
+        for (int i = 0; i < MAX_REPLACE_RETRY_COUNT; i++) {
+
+            final CacheEntry<String, String> existingEntry = cache.fetch(signalId, stringSerializer, stringDeserializer);
+
+            Signal signal = getSignal(signalId);
+            if (signal == null) {
+                signal = new Signal();
+            }
+
+            if (attributes != null) {
+                signal.attributes.putAll(attributes);
+            }
+
+            long count = signal.counts.containsKey(counterName) ? signal.counts.get(counterName) : 0;
+            count += delta;
+            signal.counts.put(counterName, count);
+
+            final String signalJson = objectMapper.writeValueAsString(signal);
+            final long revision = existingEntry != null ? existingEntry.getRevision() : -1;
+
+
+            if (cache.replace(signalId, signalJson, stringSerializer, stringSerializer, revision)) {
+                return signal;
+            }
+
+            long waitMillis = REPLACE_RETRY_WAIT_MILLIS * (i + 1);
+            logger.info("Waiting for {} ms to retry... {}.{}", waitMillis, signalId, counterName);
+            try {
+                Thread.sleep(waitMillis);
+            } catch (InterruptedException e) {
+                final String msg = String.format("Interrupted while waiting for retrying signal [%s] counter [%s].", signalId, counterName);
+                throw new ConcurrentModificationException(msg, e);
+            }
+        }
+
+        final String msg = String.format("Failed to update signal [%s] counter [%s] after retrying %d times.", signalId, counterName, MAX_REPLACE_RETRY_COUNT);
+        throw new ConcurrentModificationException(msg);
+    }
+
+    /**
+     * Retrieve a stored Signal in the cache engine.
+     * If a caller gets satisfied with the returned Signal state and finish waiting, it should call {@link #complete(String)}
+     * to complete the Wait Notify protocol.
+     * @param signalId a key in the underlying cache engine
+     * @return A Signal instance
+     * @throws IOException thrown when it failed interacting with the cache engine
+     * @throws DeserializationException thrown if the cache found is not in expected serialized format
+     */
+    public Signal getSignal(final String signalId) throws IOException, DeserializationException {
+
+        final CacheEntry<String, String> entry = cache.fetch(signalId, stringSerializer, stringDeserializer);
+
+        if (entry == null) {
+            // No signal found.
+            return null;
+        }
+
+        final String value = entry.getValue();
+
+        try {
+            return objectMapper.readValue(value, Signal.class);
+        } catch (final JsonParseException jsonE) {
+            // Try to read it as FlowFileAttributes for backward compatibility.
+            try {
+                final Map<String, String> attributes = new FlowFileAttributesSerializer().deserialize(value.getBytes(StandardCharsets.UTF_8));
+                final Signal signal = new Signal();
+                signal.setAttributes(attributes);
+                signal.getCounts().put(DEFAULT_COUNT_NAME, 1L);
+                return signal;
+            } catch (Exception attrE) {
+                final String msg = String.format("Cached value for %s was not a serialized Signal nor FlowFileAttributes. Error messages: \"%s\", \"%s\"",
+                        signalId, jsonE.getMessage(), attrE.getMessage());
+                throw new DeserializationException(msg);
+            }
+        }
+    }
+
+    /**
+     * Finish protocol and remove the cache entry.
+     * @param signalId a key in the underlying cache engine
+     * @throws IOException thrown when it failed interacting with the cache engine
+     */
+    public void complete(final String signalId) throws IOException {
+        cache.remove(signalId, stringSerializer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
index 7e95ada..f808072 100755
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
@@ -37,6 +37,9 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+
 public class TestConvertJSONToSQL {
     static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)";
 
@@ -70,6 +73,7 @@ public class TestConvertJSONToSQL {
         runner.run();
 
         runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
         runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
         final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
         out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -106,6 +110,7 @@ public class TestConvertJSONToSQL {
         runner.run();
 
         runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
         runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
         final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
         out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -140,6 +145,7 @@ public class TestConvertJSONToSQL {
         runner.run();
 
         runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
         runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
         final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
         out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -175,6 +181,7 @@ public class TestConvertJSONToSQL {
         runner.run();
 
         runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
         runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
         final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
         out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR));
@@ -210,6 +217,7 @@ public class TestConvertJSONToSQL {
         runner.run();
 
         runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5");
         runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 5);
         final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL);
         for (final MockFlowFile mff : mffs) {
@@ -245,6 +253,7 @@ public class TestConvertJSONToSQL {
         runner.run();
 
         runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5");
         runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 5);
         final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL);
         for (final MockFlowFile mff : mffs) {
@@ -279,6 +288,7 @@ public class TestConvertJSONToSQL {
         runner.run();
 
         runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
         runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
         final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
         out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR));
@@ -314,6 +324,7 @@ public class TestConvertJSONToSQL {
         runner.run();
 
         runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
         runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
         final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
         out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR));
@@ -349,6 +360,7 @@ public class TestConvertJSONToSQL {
         runner.run();
 
         runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
         runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
         final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
 
@@ -384,6 +396,7 @@ public class TestConvertJSONToSQL {
         runner.run();
 
         runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
         runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
         final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
         out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -420,6 +433,7 @@ public class TestConvertJSONToSQL {
         runner.run();
 
         runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
         runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
         final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
         out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -455,6 +469,9 @@ public class TestConvertJSONToSQL {
         runner.run();
 
         runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        final MockFlowFile originalFlowFile = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0);
+        originalFlowFile.assertAttributeExists(FRAGMENT_ID.key());
+        originalFlowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
         runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
         final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
         out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -590,6 +607,7 @@ public class TestConvertJSONToSQL {
         runner.run();
 
         runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
         runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
         final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
         out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -625,6 +643,7 @@ public class TestConvertJSONToSQL {
         runner.run();
 
         runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
         runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
         final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
         out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -687,6 +706,7 @@ public class TestConvertJSONToSQL {
         runner.run();
 
         runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
         runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
         final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
         out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -724,6 +744,7 @@ public class TestConvertJSONToSQL {
         runner.run();
 
         runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
         runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
         final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
         out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));