You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/02 21:53:06 UTC
[5/6] incubator-nifi git commit: NIFI-305: Slight refactorings to
provide more flexibility in concrete implementations
NIFI-305: Slight refactorings to provide more flexibility in concrete implementations
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/ec7f7e77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/ec7f7e77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/ec7f7e77
Branch: refs/heads/develop
Commit: ec7f7e7717750276a6cbf56edbfcca49b1299fa5
Parents: c01dff5
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Feb 2 13:51:51 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Feb 2 13:51:51 2015 -0500
----------------------------------------------------------------------
.../nifi/processors/standard/BinFiles.java | 72 +++-----------------
.../nifi/processors/standard/MergeContent.java | 69 ++++++++++---------
2 files changed, 46 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ec7f7e77/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
index 0a65c59..3d7dba1 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
@@ -19,11 +19,8 @@ package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Queue;
-import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -39,7 +36,6 @@ import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
@@ -99,38 +95,9 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The FlowFiles that were used to create the bundle").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure").build();
- private Set<Relationship> relationships;
- private List<PropertyDescriptor> descriptors;
private final BinManager binManager = new BinManager();
-
private final Queue<Bin> readyBins = new LinkedBlockingQueue<>();
- @Override
- protected final void init(final ProcessorInitializationContext context) {
-
- final Set<Relationship> relationships = new HashSet<>();
- relationships.add(REL_ORIGINAL);
- relationships.add(REL_FAILURE);
- Set<Relationship> additionalRelationships = defineAdditionalRelationships();
- if (additionalRelationships != null) {
- relationships.addAll(additionalRelationships);
- }
- this.relationships = Collections.unmodifiableSet(relationships);
-
- final List<PropertyDescriptor> descriptors = new ArrayList<>();
- descriptors.add(MIN_ENTRIES);
- descriptors.add(MAX_ENTRIES);
- descriptors.add(MIN_SIZE);
- descriptors.add(MAX_SIZE);
- descriptors.add(MAX_BIN_AGE);
- descriptors.add(MAX_BIN_COUNT);
- List<PropertyDescriptor> additionalPropertyDescriptors = this.defineAdditionalPropertyDescriptors();
- if (additionalPropertyDescriptors != null) {
- descriptors.addAll(additionalPropertyDescriptors);
- }
-
- this.descriptors = Collections.unmodifiableList(descriptors);
- }
@OnStopped
public final void resetState() {
@@ -144,27 +111,6 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
}
}
- @Override
- public final Set<Relationship> getRelationships() {
- return relationships;
- }
-
- @Override
- protected final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return descriptors;
- }
-
- /**
- * Allows any additional relationships to be defined.
- * @return Relationships to be added in the init() method
- */
- protected abstract Set<Relationship> defineAdditionalRelationships();
-
- /**
- * Allows any additional property descriptors to be defined.
- * @return Properties to be added in the init() method
- */
- protected abstract List<PropertyDescriptor> defineAdditionalPropertyDescriptors();
/**
* Allows general pre-processing of a flow file before it is offered to a
@@ -213,14 +159,14 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
* to Failure and commit their sessions. If false, the
* processBins() method will transfer the files to Original and commit
* the sessions
- * @throws Exception
- * This will be handled appropriately, and all flow files in the
- * bin will be transferred to failure and the session rolled
- * back
+ *
+ * @throws ProcessException if any problem arises while processing a bin
+ * of FlowFiles. All flow files in the
+ * bin will be transferred to failure and the ProcessSession provided by
+ * the 'session' argument rolled back
*/
- protected abstract boolean processBin(Bin unmodifiableBin,
- List<FlowFileSessionWrapper> binContents, ProcessContext context,
- ProcessSession session, ProcessorLog logger) throws Exception;
+ protected abstract boolean processBin(Bin unmodifiableBin,
+ List<FlowFileSessionWrapper> binContents, ProcessContext context, ProcessSession session) throws ProcessException;
/**
* Allows additional custom validation to be done. This will be called from
@@ -288,8 +234,8 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
boolean binAlreadyCommitted = false;
try {
- binAlreadyCommitted = this.processBin(bin, binCopy, context, session, logger);
- } catch (final Exception e) {
+ binAlreadyCommitted = this.processBin(bin, binCopy, context, session);
+ } catch (final ProcessException e) {
logger.error("Failed to process bundle of {} files due to {}", new Object[]{binCopy.size(), e});
for (final FlowFileSessionWrapper wrapper : binCopy) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ec7f7e77/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 73cb5a6..a78bc07 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -46,10 +46,10 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
@@ -198,56 +198,62 @@ public class MergeContent extends BinFiles {
public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
+
@Override
- protected Set<Relationship> defineAdditionalRelationships() {
- final Set<Relationship> relationships = new HashSet<>();
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_ORIGINAL);
+ relationships.add(REL_FAILURE);
relationships.add(REL_MERGED);
-
return relationships;
- }
-
+ }
+
+
@Override
- protected List<PropertyDescriptor> defineAdditionalPropertyDescriptors() {
- final List<PropertyDescriptor> descriptors = new ArrayList<>();
- descriptors.add(MERGE_STRATEGY);
- descriptors.add(MERGE_FORMAT);
- descriptors.add(ATTRIBUTE_STRATEGY);
- descriptors.add(CORRELATION_ATTRIBUTE_NAME);
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(MERGE_STRATEGY);
+ descriptors.add(MERGE_FORMAT);
+ descriptors.add(ATTRIBUTE_STRATEGY);
+ descriptors.add(CORRELATION_ATTRIBUTE_NAME);
+ descriptors.add(MIN_ENTRIES);
+ descriptors.add(MAX_ENTRIES);
+ descriptors.add(MIN_SIZE);
+ descriptors.add(MAX_SIZE);
+ descriptors.add(MAX_BIN_AGE);
+ descriptors.add(MAX_BIN_COUNT);
descriptors.add(HEADER);
descriptors.add(FOOTER);
descriptors.add(DEMARCATOR);
descriptors.add(COMPRESSION_LEVEL);
descriptors.add(KEEP_PATH);
-
return descriptors;
}
-
+
private byte[] readContent(final String filename) throws IOException {
return Files.readAllBytes(Paths.get(filename));
}
@Override
- protected FlowFile preprocessFlowFile(ProcessContext context,
- ProcessSession session, FlowFile flowFile) {
-
+ protected FlowFile preprocessFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
+ FlowFile processed = flowFile;
// handle backward compatibility with old segment attributes
- if (flowFile.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) {
- flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT_ATTRIBUTE, flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE));
+ if (processed.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && processed.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) {
+ processed = session.putAttribute(processed, FRAGMENT_COUNT_ATTRIBUTE, processed.getAttribute(SEGMENT_COUNT_ATTRIBUTE));
}
- if (flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) {
- flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX_ATTRIBUTE, flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE));
+ if (processed.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null && processed.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) {
+ processed = session.putAttribute(processed, FRAGMENT_INDEX_ATTRIBUTE, processed.getAttribute(SEGMENT_INDEX_ATTRIBUTE));
}
- if (flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) {
- flowFile = session.putAttribute(flowFile, FRAGMENT_ID_ATTRIBUTE, flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE));
+ if (processed.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && processed.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) {
+ processed = session.putAttribute(processed, FRAGMENT_ID_ATTRIBUTE, processed.getAttribute(SEGMENT_ID_ATTRIBUTE));
}
- return flowFile;
+ return processed;
}
@Override
- protected String getGroupId(ProcessContext context, FlowFile flowFile) {
-
+ protected String getGroupId(final ProcessContext context, final FlowFile flowFile) {
final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
String groupId = (correlationAttributeName == null) ? null : flowFile.getAttribute(correlationAttributeName);
@@ -260,16 +266,15 @@ public class MergeContent extends BinFiles {
}
@Override
- protected void setUpBinManager(BinManager binManager, ProcessContext context) {
+ protected void setUpBinManager(final BinManager binManager, final ProcessContext context) {
if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
}
}
@Override
- protected boolean processBin(Bin unmodifiableBin,
- List<FlowFileSessionWrapper> binCopy, ProcessContext context,
- ProcessSession session, ProcessorLog logger) throws Exception {
+ protected boolean processBin(final Bin unmodifiableBin, final List<FlowFileSessionWrapper> binCopy, final ProcessContext context,
+ final ProcessSession session) throws ProcessException {
final String mergeFormat = context.getProperty(MERGE_FORMAT).getValue();
MergeBin merger;
@@ -314,7 +319,7 @@ public class MergeContent extends BinFiles {
// Fail the flow files and commit them
if (error != null) {
final String binDescription = binCopy.size() <= 10 ? binCopy.toString() : binCopy.size() + " FlowFiles";
- logger.error(error + "; routing {} to failure", new Object[]{binDescription});
+ getLogger().error(error + "; routing {} to failure", new Object[]{binDescription});
for ( final FlowFileSessionWrapper wrapper : binCopy ) {
wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE);
wrapper.getSession().commit();
@@ -341,7 +346,7 @@ public class MergeContent extends BinFiles {
bundle = session.putAllAttributes(bundle, bundleAttributes);
final String inputDescription = (binCopy.size() < 10) ? binCopy.toString() : binCopy.size() + " FlowFiles";
- logger.info("Merged {} into {}", new Object[]{inputDescription, bundle});
+ getLogger().info("Merged {} into {}", new Object[]{inputDescription, bundle});
session.transfer(bundle, REL_MERGED);
// We haven't committed anything, parent will take care of it