You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/01/19 20:18:28 UTC
[2/2] nifi git commit: NIFI-3216: Add N signals to Wait/Notify
NIFI-3216: Add N signals to Wait/Notify
- Support counters at Wait/Notify processors so that NiFi flow can be
configured to wait for N signals
- Extract Wait/Notify logics into WaitNotifyProtocol
- Added FragmentAttributes to manage commonly used fragment attributes
- Changed existing split processors to set 'fragment.identifier' and
'fragment.count', so that Wait can use those to wait for all splits
get processed
This closes #1420.
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7f0171ff
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7f0171ff
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7f0171ff
Branch: refs/heads/master
Commit: 7f0171ffa25bfd8932e49b3367049b101799dea4
Parents: e62eeb7
Author: Koji Kawamura <ij...@apache.org>
Authored: Fri Jan 13 16:52:30 2017 +0900
Committer: Bryan Bende <bb...@apache.org>
Committed: Thu Jan 19 15:17:59 2017 -0500
----------------------------------------------------------------------
.../flowfile/attributes/FragmentAttributes.java | 73 +++++
.../apache/nifi/processors/avro/SplitAvro.java | 17 +-
.../nifi/processors/avro/TestSplitAvro.java | 13 +-
.../processors/standard/ConvertJSONToSQL.java | 12 +-
.../nifi/processors/standard/MergeContent.java | 9 +-
.../apache/nifi/processors/standard/Notify.java | 52 ++--
.../apache/nifi/processors/standard/PutSQL.java | 7 +-
.../processors/standard/SegmentContent.java | 10 +-
.../nifi/processors/standard/SplitContent.java | 18 +-
.../nifi/processors/standard/SplitJson.java | 18 +-
.../nifi/processors/standard/SplitText.java | 21 +-
.../nifi/processors/standard/SplitXml.java | 19 +-
.../nifi/processors/standard/UnpackContent.java | 11 +-
.../apache/nifi/processors/standard/Wait.java | 163 +++++++---
.../processors/standard/WaitNotifyProtocol.java | 193 ++++++++++++
.../standard/TestConvertJSONToSQL.java | 21 ++
.../nifi/processors/standard/TestNotify.java | 128 ++++++--
.../processors/standard/TestSegmentContent.java | 10 +
.../processors/standard/TestSplitContent.java | 21 ++
.../nifi/processors/standard/TestSplitJson.java | 24 +-
.../nifi/processors/standard/TestSplitText.java | 25 +-
.../nifi/processors/standard/TestSplitXml.java | 17 +-
.../processors/standard/TestUnpackContent.java | 26 ++
.../nifi/processors/standard/TestWait.java | 311 ++++++++++++++-----
.../standard/TestWaitNotifyProtocol.java | 245 +++++++++++++++
25 files changed, 1214 insertions(+), 250 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FragmentAttributes.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FragmentAttributes.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FragmentAttributes.java
new file mode 100644
index 0000000..6f055da
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FragmentAttributes.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowfile.attributes;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This enum class contains flow file attribute keys commonly used among Split processors.
+ */
+public enum FragmentAttributes implements FlowFileAttributeKey {
+
+ /**
+ * The number of bytes from the original FlowFile that were copied to this FlowFile,
+ * including header, if applicable, which is duplicated in each split FlowFile.
+ */
+ FRAGMENT_SIZE("fragment.size"),
+ /**
+ * All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute.
+ */
+ FRAGMENT_ID("fragment.identifier"),
+ /**
+ * A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile.
+ */
+ FRAGMENT_INDEX("fragment.index"),
+ /**
+ * The number of split FlowFiles generated from the parent FlowFile.
+ */
+ FRAGMENT_COUNT("fragment.count"),
+ /**
+ * The filename of the parent FlowFile.
+ */
+ SEGMENT_ORIGINAL_FILENAME("segment.original.filename");
+
+ private final String key;
+
+ FragmentAttributes(final String key) {
+ this.key = key;
+ }
+
+ @Override
+ public String key() {
+ return key;
+ }
+
+ public static FlowFile copyAttributesToOriginal(final ProcessSession processSession, final FlowFile originalFlowFile,
+ final String fragmentId, final int fragmentCount) {
+ final Map<String, String> attributesToOriginal = new HashMap<>();
+ if (fragmentId != null && fragmentId.length() > 0) {
+ attributesToOriginal.put(FRAGMENT_ID.key(), fragmentId);
+ }
+ attributesToOriginal.put(FRAGMENT_COUNT.key(), String.valueOf(fragmentCount));
+ return processSession.putAllAttributes(originalFlowFile, attributesToOriginal);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
index 83964fa..3a28917 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
@@ -64,6 +64,12 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedOutputStream;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal;
+
@SideEffectFree
@SupportsBatching
@Tags({ "avro", "split" })
@@ -217,13 +223,14 @@ public class SplitAvro extends AbstractProcessor {
final String fragmentIdentifier = UUID.randomUUID().toString();
IntStream.range(0, splits.size()).forEach((i) -> {
FlowFile split = splits.get(i);
- split = session.putAttribute(split, "fragment.identifier", fragmentIdentifier);
- split = session.putAttribute(split, "fragment.index", Integer.toString(i));
- split = session.putAttribute(split, "segment.original.filename", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
- split = session.putAttribute(split, "fragment.count", Integer.toString(splits.size()));
+ split = session.putAttribute(split, FRAGMENT_ID.key(), fragmentIdentifier);
+ split = session.putAttribute(split, FRAGMENT_INDEX.key(), Integer.toString(i));
+ split = session.putAttribute(split, SEGMENT_ORIGINAL_FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+ split = session.putAttribute(split, FRAGMENT_COUNT.key(), Integer.toString(splits.size()));
session.transfer(split, REL_SPLIT);
});
- session.transfer(flowFile, REL_ORIGINAL);
+ final FlowFile originalFlowFile = copyAttributesToOriginal(session, flowFile, fragmentIdentifier, splits.size());
+ session.transfer(originalFlowFile, REL_ORIGINAL);
} catch (ProcessException e) {
getLogger().error("Failed to split {} due to {}", new Object[]{flowFile, e.getMessage()}, e);
session.transfer(flowFile, REL_FAILURE);
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java
index 32d43e3..c17842d 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java
@@ -44,6 +44,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.stream.IntStream;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
import static org.junit.Assert.assertEquals;
public class TestSplitAvro {
@@ -116,6 +118,9 @@ public class TestSplitAvro {
runner.assertTransferCount(SplitAvro.REL_SPLIT, 100);
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
+ final MockFlowFile originalFlowFile = runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0);
+ originalFlowFile.assertAttributeExists(FRAGMENT_ID.key());
+ originalFlowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "100");
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkDataFileSplitSize(flowFiles, 1, true);
final String fragmentIdentifier = flowFiles.get(0).getAttribute("fragment.identifier");
@@ -123,7 +128,7 @@ public class TestSplitAvro {
MockFlowFile flowFile = flowFiles.get(i);
assertEquals(i, Integer.parseInt(flowFile.getAttribute("fragment.index")));
assertEquals(fragmentIdentifier, flowFile.getAttribute("fragment.identifier"));
- assertEquals(flowFiles.size(), Integer.parseInt(flowFile.getAttribute("fragment.count")));
+ assertEquals(flowFiles.size(), Integer.parseInt(flowFile.getAttribute(FRAGMENT_COUNT.key())));
assertEquals(filename, flowFile.getAttribute("segment.original.filename"));
});
}
@@ -140,6 +145,7 @@ public class TestSplitAvro {
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
+ runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5");
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkDataFileSplitSize(flowFiles, 20, true);
}
@@ -156,6 +162,7 @@ public class TestSplitAvro {
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
+ runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkDataFileSplitSize(flowFiles, 100, true);
}
@@ -172,6 +179,7 @@ public class TestSplitAvro {
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
+ runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "100");
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkDataFileSplitSize(flowFiles, 1, false);
@@ -197,6 +205,7 @@ public class TestSplitAvro {
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
+ runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "100");
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkBareRecordsSplitSize(flowFiles, 1, true);
@@ -215,6 +224,7 @@ public class TestSplitAvro {
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
+ runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5");
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkBareRecordsSplitSize(flowFiles, 20, true);
@@ -234,6 +244,7 @@ public class TestSplitAvro {
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
+ runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5");
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkBareRecordsSplitSize(flowFiles, 20, false);
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
index 2db7bcc..1ce6fb5 100755
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@ -65,6 +65,11 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode;
import org.codehaus.jackson.node.JsonNodeFactory;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal;
+
@SideEffectFree
@SupportsBatching
@SeeAlso(PutSQL.class)
@@ -369,9 +374,9 @@ public class ConvertJSONToSQL extends AbstractProcessor {
attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
attributes.put("sql.table", tableName);
- attributes.put("fragment.identifier", fragmentIdentifier);
- attributes.put("fragment.count", String.valueOf(arrayNode.size()));
- attributes.put("fragment.index", String.valueOf(i));
+ attributes.put(FRAGMENT_ID.key(), fragmentIdentifier);
+ attributes.put(FRAGMENT_COUNT.key(), String.valueOf(arrayNode.size()));
+ attributes.put(FRAGMENT_INDEX.key(), String.valueOf(i));
if (catalog != null) {
attributes.put("sql.catalog", catalog);
@@ -381,6 +386,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
session.transfer(sqlFlowFile, REL_SQL);
}
+ flowFile = copyAttributesToOriginal(session, flowFile, fragmentIdentifier, arrayNode.size());
session.transfer(flowFile, REL_ORIGINAL);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 16bb911..f18416e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -67,6 +67,7 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
@@ -127,15 +128,15 @@ import org.apache.nifi.util.FlowFilePackagerV3;
public class MergeContent extends BinFiles {
// preferred attributes
- public static final String FRAGMENT_ID_ATTRIBUTE = "fragment.identifier";
- public static final String FRAGMENT_INDEX_ATTRIBUTE = "fragment.index";
- public static final String FRAGMENT_COUNT_ATTRIBUTE = "fragment.count";
+ public static final String FRAGMENT_ID_ATTRIBUTE = FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX_ATTRIBUTE = FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT_ATTRIBUTE = FragmentAttributes.FRAGMENT_COUNT.key();
// old style attributes
public static final String SEGMENT_ID_ATTRIBUTE = "segment.identifier";
public static final String SEGMENT_INDEX_ATTRIBUTE = "segment.index";
public static final String SEGMENT_COUNT_ATTRIBUTE = "segment.count";
- public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
+ public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
public static final AllowableValue MERGE_STRATEGY_BIN_PACK = new AllowableValue(
"Bin-Packing Algorithm",
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
index 32cd249..9d809ef 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
@@ -17,8 +17,6 @@
package org.apache.nifi.processors.standard;
import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -37,9 +35,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
-import org.apache.nifi.distributed.cache.client.Serializer;
-import org.apache.nifi.distributed.cache.client.exception.SerializationException;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@@ -49,7 +45,6 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
@EventDriven
@SupportsBatching
@@ -67,7 +62,7 @@ public class Notify extends AbstractProcessor {
.name("Distributed Cache Service")
.description("The Controller Service that is used to cache release signals in order to release files queued at a corresponding Wait processor")
.required(true)
- .identifiesControllerService(DistributedMapCacheClient.class)
+ .identifiesControllerService(AtomicDistributedMapCacheClient.class)
.build();
// Selects the FlowFile attribute or expression, whose value is used as cache key
@@ -80,6 +75,18 @@ public class Notify extends AbstractProcessor {
.expressionLanguageSupported(true)
.build();
+ public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder()
+ .name("Signal Counter Name")
+ .description("A value, or the results of an Attribute Expression Language statement, which will " +
+ "be evaluated against a FlowFile in order to determine the signal counter name. " +
+ "Signal counter name is useful when a corresponding Wait processor needs to know the number of occurrences " +
+ "of different types of events, such as success or failure, or destination data source names, etc.")
+ .required(true)
+ .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
+ .expressionLanguageSupported(true)
+ .defaultValue(WaitNotifyProtocol.DEFAULT_COUNT_NAME)
+ .build();
+
// Specifies an optional regex used to identify which attributes to cache
public static final PropertyDescriptor ATTRIBUTE_CACHE_REGEX = new PropertyDescriptor.Builder()
.name("Attribute Cache Regex")
@@ -103,9 +110,6 @@ public class Notify extends AbstractProcessor {
private final Set<Relationship> relationships;
- private final Serializer<String> keySerializer = new StringSerializer();
- private final Serializer<Map<String, String>> valueSerializer = new FlowFileAttributesSerializer();
-
public Notify() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
@@ -117,6 +121,7 @@ public class Notify extends AbstractProcessor {
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(RELEASE_SIGNAL_IDENTIFIER);
+ descriptors.add(SIGNAL_COUNTER_NAME);
descriptors.add(DISTRIBUTED_CACHE_SERVICE);
descriptors.add(ATTRIBUTE_CACHE_REGEX);
return descriptors;
@@ -137,11 +142,12 @@ public class Notify extends AbstractProcessor {
final ComponentLog logger = getLogger();
- // cache key is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
- final String cacheKey = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
+ // Signal id is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
+ final String signalId = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
+ final String counterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue();
// if the computed value is null, or empty, we transfer the flow file to failure relationship
- if (StringUtils.isBlank(cacheKey)) {
+ if (StringUtils.isBlank(signalId)) {
logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
@@ -149,7 +155,8 @@ public class Notify extends AbstractProcessor {
}
// the cache client used to interact with the distributed cache
- final DistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
+ final AtomicDistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class);
+ final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
try {
final String attributeCacheRegex = (context.getProperty(ATTRIBUTE_CACHE_REGEX).isSet())
@@ -164,10 +171,12 @@ public class Notify extends AbstractProcessor {
}
if (logger.isDebugEnabled()) {
- logger.debug("Cached release signal identifier {} from FlowFile {}", new Object[] {cacheKey, flowFile});
+ logger.debug("Cached release signal identifier {} counterName {} from FlowFile {}", new Object[] {signalId, counterName, flowFile});
}
- cache.put(cacheKey, attributesToCache, keySerializer, valueSerializer);
+ // In case of ConcurrentModificationException, just throw the exception so that processor can
+ // retry after yielding for a while.
+ protocol.notify(signalId, counterName, 1, attributesToCache);
session.transfer(flowFile, REL_SUCCESS);
} catch (final IOException e) {
@@ -177,15 +186,4 @@ public class Notify extends AbstractProcessor {
}
}
- /**
- * Simple string serializer, used for serializing the cache key
- */
- public static class StringSerializer implements Serializer<String> {
-
- @Override
- public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
- out.write(value.getBytes(StandardCharsets.UTF_8));
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index adfac05..76901fe 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -29,6 +29,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessContext;
@@ -161,9 +162,9 @@ public class PutSQL extends AbstractProcessor {
private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type");
private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
- private static final String FRAGMENT_ID_ATTR = "fragment.identifier";
- private static final String FRAGMENT_INDEX_ATTR = "fragment.index";
- private static final String FRAGMENT_COUNT_ATTR = "fragment.count";
+ private static final String FRAGMENT_ID_ATTR = FragmentAttributes.FRAGMENT_ID.key();
+ private static final String FRAGMENT_INDEX_ATTR = FragmentAttributes.FRAGMENT_INDEX.key();
+ private static final String FRAGMENT_COUNT_ATTR = FragmentAttributes.FRAGMENT_COUNT.key();
private static final Pattern LONG_PATTERN = Pattern.compile("^\\d{1,19}$");
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
index 4e25d3c..1fc1feb 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
@@ -38,6 +38,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
@@ -80,11 +81,11 @@ public class SegmentContent extends AbstractProcessor {
public static final String SEGMENT_ID = "segment.identifier";
public static final String SEGMENT_INDEX = "segment.index";
public static final String SEGMENT_COUNT = "segment.count";
- public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
+ public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
- public static final String FRAGMENT_ID = "fragment.identifier";
- public static final String FRAGMENT_INDEX = "fragment.index";
- public static final String FRAGMENT_COUNT = "fragment.count";
+ public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
public static final PropertyDescriptor SIZE = new PropertyDescriptor.Builder()
.name("Segment Size")
@@ -180,6 +181,7 @@ public class SegmentContent extends AbstractProcessor {
}
session.transfer(segmentSet, REL_SEGMENTS);
+ flowFile = FragmentAttributes.copyAttributesToOriginal(session, flowFile, segmentId, totalSegments);
session.transfer(flowFile, REL_ORIGINAL);
if (totalSegments <= 10) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
index 52b124e..d20fe8c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
@@ -50,6 +50,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -77,10 +78,10 @@ import org.apache.nifi.util.Tuple;
public class SplitContent extends AbstractProcessor {
// attribute keys
- public static final String FRAGMENT_ID = "fragment.identifier";
- public static final String FRAGMENT_INDEX = "fragment.index";
- public static final String FRAGMENT_COUNT = "fragment.count";
- public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
+ public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
+ public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
static final AllowableValue HEX_FORMAT = new AllowableValue("Hexadecimal", "Hexadecimal", "The Byte Sequence will be interpreted as a hexadecimal representation of bytes");
static final AllowableValue UTF8_FORMAT = new AllowableValue("Text", "Text", "The Byte Sequence will be interpreted as UTF-8 Encoded text");
@@ -182,7 +183,7 @@ public class SplitContent extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
- final FlowFile flowFile = session.get();
+ FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
@@ -285,8 +286,9 @@ public class SplitContent extends AbstractProcessor {
splitList.add(finalSplit);
}
- finishFragmentAttributes(session, flowFile, splitList);
+ final String fragmentId = finishFragmentAttributes(session, flowFile, splitList);
session.transfer(splitList, REL_SPLITS);
+ flowFile = FragmentAttributes.copyAttributesToOriginal(session, flowFile, fragmentId, splitList.size());
session.transfer(flowFile, REL_ORIGINAL);
if (splitList.size() > 10) {
@@ -302,8 +304,9 @@ public class SplitContent extends AbstractProcessor {
* @param session session
* @param source source
* @param splits splits
+ * @return generated fragment identifier for the splits
*/
- private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) {
+ private String finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) {
final String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key());
final String fragmentId = UUID.randomUUID().toString();
@@ -319,6 +322,7 @@ public class SplitContent extends AbstractProcessor {
FlowFile newFF = session.putAllAttributes(ff, attributes);
splits.add(newFF);
}
+ return fragmentId;
}
static class HexStringPropertyValidator implements Validator {
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
index 003834e..e7df459 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
@@ -56,6 +56,12 @@ import com.jayway.jsonpath.InvalidJsonException;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal;
+
@EventDriven
@SideEffectFree
@SupportsBatching
@@ -166,7 +172,7 @@ public class SplitJson extends AbstractJsonPathProcessor {
@Override
public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) {
- final FlowFile original = processSession.get();
+ FlowFile original = processSession.get();
if (original == null) {
return;
}
@@ -202,8 +208,9 @@ public class SplitJson extends AbstractJsonPathProcessor {
List resultList = (List) jsonPathResult;
Map<String, String> attributes = new HashMap<>();
- attributes.put("fragment.identifier", UUID.randomUUID().toString());
- attributes.put("fragment.count", Integer.toString(resultList.size()));
+ final String fragmentId = UUID.randomUUID().toString();
+ attributes.put(FRAGMENT_ID.key(), fragmentId);
+ attributes.put(FRAGMENT_COUNT.key(), Integer.toString(resultList.size()));
for (int i = 0; i < resultList.size(); i++) {
Object resultSegment = resultList.get(i);
@@ -213,11 +220,12 @@ public class SplitJson extends AbstractJsonPathProcessor {
out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8));
}
);
- attributes.put("segment.original.filename", split.getAttribute(CoreAttributes.FILENAME.key()));
- attributes.put("fragment.index", Integer.toString(i));
+ attributes.put(SEGMENT_ORIGINAL_FILENAME.key(), split.getAttribute(CoreAttributes.FILENAME.key()));
+ attributes.put(FRAGMENT_INDEX.key(), Integer.toString(i));
processSession.transfer(processSession.putAllAttributes(split, attributes), REL_SPLIT);
}
+ original = copyAttributesToOriginal(processSession, original, fragmentId, resultList.size());
processSession.transfer(original, REL_ORIGINAL);
logger.info("Split {} into {} FlowFiles", new Object[]{original, resultList.size()});
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
index ddb770d..e57841f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
@@ -48,6 +48,7 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
@@ -86,11 +87,11 @@ import org.apache.nifi.stream.io.util.TextLineDemarcator.OffsetInfo;
public class SplitText extends AbstractProcessor {
// attribute keys
public static final String SPLIT_LINE_COUNT = "text.line.count";
- public static final String FRAGMENT_SIZE = "fragment.size";
- public static final String FRAGMENT_ID = "fragment.identifier";
- public static final String FRAGMENT_INDEX = "fragment.index";
- public static final String FRAGMENT_COUNT = "fragment.count";
- public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
+ public static final String FRAGMENT_SIZE = FragmentAttributes.FRAGMENT_SIZE.key();
+ public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
+ public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
public static final PropertyDescriptor LINE_SPLIT_COUNT = new PropertyDescriptor.Builder()
.name("Line Split Count")
@@ -250,8 +251,10 @@ public class SplitText extends AbstractProcessor {
if (error.get()){
processSession.transfer(sourceFlowFile, REL_FAILURE);
} else {
- List<FlowFile> splitFlowFiles = this.generateSplitFlowFiles(sourceFlowFile, headerSplitInfoRef.get(), computedSplitsInfo, processSession);
- processSession.transfer(sourceFlowFile, REL_ORIGINAL);
+ final String fragmentId = UUID.randomUUID().toString();
+ List<FlowFile> splitFlowFiles = this.generateSplitFlowFiles(fragmentId, sourceFlowFile, headerSplitInfoRef.get(), computedSplitsInfo, processSession);
+ final FlowFile originalFlowFile = FragmentAttributes.copyAttributesToOriginal(processSession, sourceFlowFile, fragmentId, splitFlowFiles.size());
+ processSession.transfer(originalFlowFile, REL_ORIGINAL);
if (!splitFlowFiles.isEmpty()) {
processSession.transfer(splitFlowFiles, REL_SPLITS);
}
@@ -279,7 +282,8 @@ public class SplitText extends AbstractProcessor {
* it signifies the header information and its contents will be included in
* each and every computed split.
*/
- private List<FlowFile> generateSplitFlowFiles(FlowFile sourceFlowFile, SplitInfo splitInfo, List<SplitInfo> computedSplitsInfo, ProcessSession processSession){
+ private List<FlowFile> generateSplitFlowFiles(final String fragmentId, FlowFile sourceFlowFile, SplitInfo splitInfo,
+ List<SplitInfo> computedSplitsInfo, ProcessSession processSession){
List<FlowFile> splitFlowFiles = new ArrayList<>();
FlowFile headerFlowFile = null;
long headerCrlfLength = 0;
@@ -288,7 +292,6 @@ public class SplitText extends AbstractProcessor {
headerCrlfLength = splitInfo.trimmedLength;
}
int fragmentIndex = 1; // set to 1 to preserve the existing behavior *only*. Perhaps should be deprecated to follow the 0,1,2... scheme
- String fragmentId = UUID.randomUUID().toString();
if ((computedSplitsInfo.size() == 0) && (headerFlowFile != null)) {
FlowFile splitFlowFile = processSession.clone(sourceFlowFile, 0, headerFlowFile.getSize() - headerCrlfLength);
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java
index 0f0032a..502f7f3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java
@@ -64,6 +64,12 @@ import org.xml.sax.Locator;
import org.xml.sax.SAXException;
import org.xml.sax.XMLReader;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal;
+
@EventDriven
@SideEffectFree
@SupportsBatching
@@ -161,9 +167,9 @@ public class SplitXml extends AbstractProcessor {
final XmlSplitterSaxParser parser = new XmlSplitterSaxParser(xmlTree -> {
FlowFile split = session.create(original);
split = session.write(split, out -> out.write(xmlTree.getBytes("UTF-8")));
- split = session.putAttribute(split, "fragment.identifier", fragmentIdentifier);
- split = session.putAttribute(split, "fragment.index", Integer.toString(numberOfRecords.getAndIncrement()));
- split = session.putAttribute(split, "segment.original.filename", split.getAttribute(CoreAttributes.FILENAME.key()));
+ split = session.putAttribute(split, FRAGMENT_ID.key(), fragmentIdentifier);
+ split = session.putAttribute(split, FRAGMENT_INDEX.key(), Integer.toString(numberOfRecords.getAndIncrement()));
+ split = session.putAttribute(split, SEGMENT_ORIGINAL_FILENAME.key(), split.getAttribute(CoreAttributes.FILENAME.key()));
splits.add(split);
}, depth);
@@ -188,12 +194,13 @@ public class SplitXml extends AbstractProcessor {
session.remove(splits);
} else {
splits.forEach((split) -> {
- split = session.putAttribute(split, "fragment.count", Integer.toString(numberOfRecords.get()));
+ split = session.putAttribute(split, FRAGMENT_COUNT.key(), Integer.toString(numberOfRecords.get()));
session.transfer(split, REL_SPLIT);
});
- session.transfer(original, REL_ORIGINAL);
- logger.info("Split {} into {} FlowFiles", new Object[]{original, splits.size()});
+ final FlowFile originalToTransfer = copyAttributesToOriginal(session, original, fragmentIdentifier, numberOfRecords.get());
+ session.transfer(originalToTransfer, REL_ORIGINAL);
+ logger.info("Split {} into {} FlowFiles", new Object[]{originalToTransfer, splits.size()});
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
index 0437ed1..18015fa 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
@@ -53,6 +53,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -97,10 +98,10 @@ import org.apache.nifi.util.FlowFileUnpackagerV3;
@SeeAlso(MergeContent.class)
public class UnpackContent extends AbstractProcessor {
// attribute keys
- public static final String FRAGMENT_ID = "fragment.identifier";
- public static final String FRAGMENT_INDEX = "fragment.index";
- public static final String FRAGMENT_COUNT = "fragment.count";
- public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
+ public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
+ public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
public static final String AUTO_DETECT_FORMAT_NAME = "use mime.type attribute";
public static final String TAR_FORMAT_NAME = "tar";
@@ -262,6 +263,8 @@ public class UnpackContent extends AbstractProcessor {
finishFragmentAttributes(session, flowFile, unpacked);
}
session.transfer(unpacked, REL_SUCCESS);
+ final String fragmentId = unpacked.size() > 0 ? unpacked.get(0).getAttribute(FRAGMENT_ID) : null;
+ flowFile = FragmentAttributes.copyAttributesToOriginal(session, flowFile, fragmentId, unpacked.size());
session.transfer(flowFile, REL_ORIGINAL);
session.getProvenanceReporter().fork(flowFile, unpacked);
logger.info("Unpacked {} into {} and transferred to success", new Object[]{flowFile, unpacked});
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
index 4a4f803..0bd5ca6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
@@ -17,8 +17,6 @@
package org.apache.nifi.processors.standard;
import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -28,6 +26,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
@@ -35,15 +34,13 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.distributed.cache.client.Deserializer;
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
-import org.apache.nifi.distributed.cache.client.Serializer;
-import org.apache.nifi.distributed.cache.client.exception.SerializationException;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@@ -53,19 +50,30 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
+import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal;
@EventDriven
@SupportsBatching
@Tags({"map", "cache", "wait", "hold", "distributed", "signal", "release"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Routes incoming FlowFiles to the 'wait' relationship until a matching release signal "
- + "is stored in the distributed cache from a corresponding Notify processor. At this point, a waiting FlowFile is routed to "
- + "the 'success' relationship, with attributes copied from the FlowFile that produced "
- + "the release signal from the Notify processor. The release signal entry is then removed from "
- + "the cache. Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration.")
-@WritesAttribute(attribute = "wait.start.timestamp", description = "All FlowFiles will have an attribute 'wait.start.timestamp', which sets the "
- + "initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile.")
+ + "is stored in the distributed cache from a corresponding Notify processor. "
+ + "When a matching release signal is identified, a waiting FlowFile is routed to the 'success' relationship, "
+ + "with attributes copied from the FlowFile that produced the release signal from the Notify processor. "
+ + "The release signal entry is then removed from the cache. Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration. "
+
+ + "If you need to wait for more than one signal, specify the desired number of signals via the 'Target Signal Count' property. "
+ + "This is particularly useful with processors that split a source flow file into multiple fragments, such as SplitText. "
+ + "In order to wait for all fragments to be processed, connect the 'original' relationship to a Wait processor, and the 'splits' relationship to "
+ + "a corresponding Notify processor. Configure the Notify and Wait processors to use the '${fragment.identifier}' as the value "
+ + "of 'Release Signal Identifier', and specify '${fragment.count}' as the value of 'Target Signal Count' in the Wait processor."
+)
+@WritesAttributes({
+ @WritesAttribute(attribute = "wait.start.timestamp", description = "All FlowFiles will have an attribute 'wait.start.timestamp', which sets the "
+ + "initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile."),
+ @WritesAttribute(attribute = "wait.counter.<counterName>", description = "If a signal exists when the processor runs, "
+ + "each count value in the signal is copied.")
+})
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
"org.apache.nifi.processors.standard.Notify"})
public class Wait extends AbstractProcessor {
@@ -77,7 +85,7 @@ public class Wait extends AbstractProcessor {
.name("Distributed Cache Service")
.description("The Controller Service that is used to check for release signals from a corresponding Notify processor")
.required(true)
- .identifiesControllerService(DistributedMapCacheClient.class)
+ .identifiesControllerService(AtomicDistributedMapCacheClient.class)
.build();
// Selects the FlowFile attribute or expression, whose value is used as cache key
@@ -90,6 +98,29 @@ public class Wait extends AbstractProcessor {
.expressionLanguageSupported(true)
.build();
+ public static final PropertyDescriptor TARGET_SIGNAL_COUNT = new PropertyDescriptor.Builder()
+ .name("Target Signal Count")
+ .description("A value, or the results of an Attribute Expression Language statement, which will " +
+ "be evaluated against a FlowFile in order to determine the target signal count. " +
+ "This processor checks whether the signal count has reached this number. " +
+ "If Signal Counter Name is specified, this processor checks a particular counter, " +
+ "otherwise checks against total count in a signal.")
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .defaultValue("1")
+ .build();
+
+ public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder()
+ .name("Signal Counter Name")
+ .description("A value, or the results of an Attribute Expression Language statement, which will " +
+ "be evaluated against a FlowFile in order to determine the signal counter name. " +
+ "If not specified, this processor checks the total count in a signal.")
+ .required(false)
+ .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
+ .expressionLanguageSupported(true)
+ .build();
+
// Selects the FlowFile attribute or expression, whose value is used as cache key
public static final PropertyDescriptor EXPIRATION_DURATION = new PropertyDescriptor.Builder()
.name("Expiration Duration")
@@ -136,9 +167,6 @@ public class Wait extends AbstractProcessor {
.build();
private final Set<Relationship> relationships;
- private final Serializer<String> keySerializer = new StringSerializer();
- private final Deserializer<Map<String, String>> valueDeserializer = new FlowFileAttributesSerializer();
-
public Wait() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
@@ -152,6 +180,8 @@ public class Wait extends AbstractProcessor {
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(RELEASE_SIGNAL_IDENTIFIER);
+ descriptors.add(TARGET_SIGNAL_COUNT);
+ descriptors.add(SIGNAL_COUNTER_NAME);
descriptors.add(EXPIRATION_DURATION);
descriptors.add(DISTRIBUTED_CACHE_SERVICE);
descriptors.add(ATTRIBUTE_COPY_MODE);
@@ -173,11 +203,11 @@ public class Wait extends AbstractProcessor {
final ComponentLog logger = getLogger();
- // cache key is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
- final String cacheKey = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
+ // Signal id is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
+ final String signalId = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
// if the computed value is null, or empty, we transfer the flow file to failure relationship
- if (StringUtils.isBlank(cacheKey)) {
+ if (StringUtils.isBlank(signalId)) {
logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
@@ -185,9 +215,17 @@ public class Wait extends AbstractProcessor {
}
// the cache client used to interact with the distributed cache
- final DistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
+ final AtomicDistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class);
+ final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
+
+ String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue();
+ final boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode);
+ Signal signal = null;
try {
+ // get notifying signal
+ signal = protocol.getSignal(signalId);
+
// check for expiration
String waitStartTimestamp = flowFile.getAttribute(WAIT_START_TIMESTAMP);
if (waitStartTimestamp == null) {
@@ -201,6 +239,8 @@ public class Wait extends AbstractProcessor {
} catch (NumberFormatException nfe) {
logger.error("{} has an invalid value '{}' on FlowFile {}", new Object[] {WAIT_START_TIMESTAMP, waitStartTimestamp, flowFile});
flowFile = session.penalize(flowFile);
+
+ flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
session.transfer(flowFile, REL_FAILURE);
return;
}
@@ -209,58 +249,83 @@ public class Wait extends AbstractProcessor {
long now = System.currentTimeMillis();
if (now > (lWaitStartTimestamp + expirationDuration)) {
logger.warn("FlowFile {} expired after {}ms", new Object[] {flowFile, (now - lWaitStartTimestamp)});
+ flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
session.transfer(flowFile, REL_EXPIRED);
return;
}
- // get notifying flow file attributes
- Map<String, String> cachedAttributes = cache.get(cacheKey, keySerializer, valueDeserializer);
-
- if (cachedAttributes == null) {
+ if (signal == null) {
+ // If there's no signal yet, then we don't have to evaluate target counts. Return immediately.
if (logger.isDebugEnabled()) {
- logger.debug("No release signal yet for {} on FlowFile {}", new Object[] {cacheKey, flowFile});
+ logger.debug("No release signal yet for {} on FlowFile {}", new Object[] {signalId, flowFile});
}
session.transfer(flowFile, REL_WAIT);
return;
}
- // copy over attributes from release signal flow file, if provided
- if (!cachedAttributes.isEmpty()) {
- cachedAttributes.remove("uuid");
- String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue();
- if (ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode)) {
- flowFile = session.putAllAttributes(flowFile, cachedAttributes);
- } else {
- Map<String, String> attributesToCopy = new HashMap<>();
- for(Entry<String, String> entry : cachedAttributes.entrySet()) {
- // if the current flow file does *not* have the cached attribute, copy it
- if (flowFile.getAttribute(entry.getKey()) == null) {
- attributesToCopy.put(entry.getKey(), entry.getValue());
- }
- }
- flowFile = session.putAllAttributes(flowFile, attributesToCopy);
+ final String targetCounterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final Long targetCount = Long.valueOf(context.getProperty(TARGET_SIGNAL_COUNT).evaluateAttributeExpressions(flowFile).getValue());
+ final boolean reachedToTargetCount = StringUtils.isBlank(targetCounterName)
+ ? signal.isTotalCountReached(targetCount)
+ : signal.isCountReached(targetCounterName, targetCount);
+
+ if (!reachedToTargetCount) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Release signal count {} hasn't reached {} for {} on FlowFile {}",
+ new Object[] {targetCounterName, targetCount, signalId, flowFile});
}
+ flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
+ session.transfer(flowFile, REL_WAIT);
+ return;
}
+
+ flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
session.transfer(flowFile, REL_SUCCESS);
- cache.remove(cacheKey, keySerializer);
+ protocol.complete(signalId);
+
+ } catch (final NumberFormatException e) {
+ flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ logger.error("Failed to parse targetCount when processing {} due to {}", new Object[] {flowFile, e});
+
} catch (final IOException e) {
+ flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
logger.error("Unable to communicate with cache when processing {} due to {}", new Object[] {flowFile, e});
}
}
- /**
- * Simple string serializer, used for serializing the cache key
- */
- public static class StringSerializer implements Serializer<String> {
+ private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final boolean replaceOriginal) {
+ if (signal == null) {
+ return flowFile;
+ }
- @Override
- public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
- out.write(value.getBytes(StandardCharsets.UTF_8));
+ // copy over attributes from release signal flow file, if provided
+ final Map<String, String> attributesToCopy;
+ if (replaceOriginal) {
+ attributesToCopy = new HashMap<>(signal.getAttributes());
+ attributesToCopy.remove("uuid");
+ } else {
+ // if the current flow file does *not* have the cached attribute, copy it
+ attributesToCopy = signal.getAttributes().entrySet().stream()
+ .filter(e -> flowFile.getAttribute(e.getKey()) == null)
+ .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
}
+
+ // Copy counter attributes
+ final Map<String, Long> counts = signal.getCounts();
+ final long totalCount = counts.entrySet().stream().mapToLong(e -> {
+ final Long count = e.getValue();
+ attributesToCopy.put("wait.counter." + e.getKey(), String.valueOf(count));
+ return count;
+ }).sum();
+ attributesToCopy.put("wait.counter.total", String.valueOf(totalCount));
+
+ return session.putAllAttributes(flowFile, attributesToCopy);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
new file mode 100644
index 0000000..a74590a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient.CacheEntry;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class provide a protocol for Wait and Notify processors to work together.
+ * Once AtomicDistributedMapCacheClient is passed to this protocol, components that wish to join the notification mechanism
+ * should only use methods provided by this protocol, instead of calling cache API directly.
+ */
+public class WaitNotifyProtocol {
+
+ private static final Logger logger = LoggerFactory.getLogger(WaitNotifyProtocol.class);
+
+ public static final String DEFAULT_COUNT_NAME = "default";
+ private static final int MAX_REPLACE_RETRY_COUNT = 5;
+ private static final int REPLACE_RETRY_WAIT_MILLIS = 10;
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+ private static final Serializer<String> stringSerializer = (value, output) -> output.write(value.getBytes(StandardCharsets.UTF_8));
+ private final Deserializer<String> stringDeserializer = input -> new String(input, StandardCharsets.UTF_8);
+
+ public static class Signal {
+ private Map<String, Long> counts = new HashMap<>();
+ private Map<String, String> attributes = new HashMap<>();
+
+ public Map<String, Long> getCounts() {
+ return counts;
+ }
+
+ public void setCounts(Map<String, Long> counts) {
+ this.counts = counts;
+ }
+
+ public Map<String, String> getAttributes() {
+ return attributes;
+ }
+
+ public void setAttributes(Map<String, String> attributes) {
+ this.attributes = attributes;
+ }
+
+ public boolean isTotalCountReached(final long targetCount) {
+ final long totalCount = counts.values().stream().mapToLong(Long::longValue).sum();
+ return totalCount >= targetCount;
+ }
+
+ public boolean isCountReached(final String counterName, final long targetCount) {
+ return getCount(counterName) >= targetCount;
+ }
+
+ public long getCount(final String counterName) {
+ final Long count = counts.get(counterName);
+ return count != null ? count : -1;
+ }
+
+ }
+
+ private final AtomicDistributedMapCacheClient cache;
+
+ public WaitNotifyProtocol(final AtomicDistributedMapCacheClient cache) {
+ this.cache = cache;
+ }
+
+ /**
+ * Notify a signal to increase a counter.
+ * @param signalId a key in the underlying cache engine
+ * @param counterName specify count to update
+ * @param delta delta to update a counter
+ * @param attributes attributes to save in the cache entry
+ * @return A Signal instance, merged with an existing signal if any
+ * @throws IOException thrown when it failed interacting with the cache engine
+ * @throws ConcurrentModificationException thrown if other process is also updating the same signal and failed to update after few retry attempts
+ */
+ public Signal notify(final String signalId, final String counterName, final int delta, final Map<String, String> attributes)
+ throws IOException, ConcurrentModificationException {
+
+ for (int i = 0; i < MAX_REPLACE_RETRY_COUNT; i++) {
+
+ final CacheEntry<String, String> existingEntry = cache.fetch(signalId, stringSerializer, stringDeserializer);
+
+ Signal signal = getSignal(signalId);
+ if (signal == null) {
+ signal = new Signal();
+ }
+
+ if (attributes != null) {
+ signal.attributes.putAll(attributes);
+ }
+
+ long count = signal.counts.containsKey(counterName) ? signal.counts.get(counterName) : 0;
+ count += delta;
+ signal.counts.put(counterName, count);
+
+ final String signalJson = objectMapper.writeValueAsString(signal);
+ final long revision = existingEntry != null ? existingEntry.getRevision() : -1;
+
+
+ if (cache.replace(signalId, signalJson, stringSerializer, stringSerializer, revision)) {
+ return signal;
+ }
+
+ long waitMillis = REPLACE_RETRY_WAIT_MILLIS * (i + 1);
+ logger.info("Waiting for {} ms to retry... {}.{}", waitMillis, signalId, counterName);
+ try {
+ Thread.sleep(waitMillis);
+ } catch (InterruptedException e) {
+ final String msg = String.format("Interrupted while waiting for retrying signal [%s] counter [%s].", signalId, counterName);
+ throw new ConcurrentModificationException(msg, e);
+ }
+ }
+
+ final String msg = String.format("Failed to update signal [%s] counter [%s] after retrying %d times.", signalId, counterName, MAX_REPLACE_RETRY_COUNT);
+ throw new ConcurrentModificationException(msg);
+ }
+
+ /**
+ * Retrieve a stored Signal in the cache engine.
+ * If a caller gets satisfied with the returned Signal state and finish waiting, it should call {@link #complete(String)}
+ * to complete the Wait Notify protocol.
+ * @param signalId a key in the underlying cache engine
+ * @return A Signal instance
+ * @throws IOException thrown when it failed interacting with the cache engine
+ * @throws DeserializationException thrown if the cache found is not in expected serialized format
+ */
+ public Signal getSignal(final String signalId) throws IOException, DeserializationException {
+
+ final CacheEntry<String, String> entry = cache.fetch(signalId, stringSerializer, stringDeserializer);
+
+ if (entry == null) {
+ // No signal found.
+ return null;
+ }
+
+ final String value = entry.getValue();
+
+ try {
+ return objectMapper.readValue(value, Signal.class);
+ } catch (final JsonParseException jsonE) {
+ // Try to read it as FlowFileAttributes for backward compatibility.
+ try {
+ final Map<String, String> attributes = new FlowFileAttributesSerializer().deserialize(value.getBytes(StandardCharsets.UTF_8));
+ final Signal signal = new Signal();
+ signal.setAttributes(attributes);
+ signal.getCounts().put(DEFAULT_COUNT_NAME, 1L);
+ return signal;
+ } catch (Exception attrE) {
+ final String msg = String.format("Cached value for %s was not a serialized Signal nor FlowFileAttributes. Error messages: \"%s\", \"%s\"",
+ signalId, jsonE.getMessage(), attrE.getMessage());
+ throw new DeserializationException(msg);
+ }
+ }
+ }
+
+ /**
+ * Finish protocol and remove the cache entry.
+ * @param signalId a key in the underlying cache engine
+ * @throws IOException thrown when it failed interacting with the cache engine
+ */
+ public void complete(final String signalId) throws IOException {
+ cache.remove(signalId, stringSerializer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f0171ff/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
index 7e95ada..f808072 100755
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
@@ -37,6 +37,9 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
+import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
+
public class TestConvertJSONToSQL {
static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)";
@@ -70,6 +73,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -106,6 +110,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -140,6 +145,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -175,6 +181,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR));
@@ -210,6 +217,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 5);
final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL);
for (final MockFlowFile mff : mffs) {
@@ -245,6 +253,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 5);
final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL);
for (final MockFlowFile mff : mffs) {
@@ -279,6 +288,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR));
@@ -314,6 +324,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR));
@@ -349,6 +360,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
@@ -384,6 +396,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -420,6 +433,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -455,6 +469,9 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ final MockFlowFile originalFlowFile = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0);
+ originalFlowFile.assertAttributeExists(FRAGMENT_ID.key());
+ originalFlowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -590,6 +607,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -625,6 +643,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -687,6 +706,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@@ -724,6 +744,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));