You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/02 21:53:06 UTC

[5/6] incubator-nifi git commit: NIFI-305: Slight refactorings to provide more flexibility in concrete implementations

NIFI-305: Slight refactorings to provide more flexibility in concrete implementations


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/ec7f7e77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/ec7f7e77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/ec7f7e77

Branch: refs/heads/develop
Commit: ec7f7e7717750276a6cbf56edbfcca49b1299fa5
Parents: c01dff5
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Feb 2 13:51:51 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Feb 2 13:51:51 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/BinFiles.java      | 72 +++-----------------
 .../nifi/processors/standard/MergeContent.java  | 69 ++++++++++---------
 2 files changed, 46 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ec7f7e77/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
index 0a65c59..3d7dba1 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
@@ -19,11 +19,8 @@ package org.apache.nifi.processors.standard;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Queue;
-import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -39,7 +36,6 @@ import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
@@ -99,38 +95,9 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
     public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The FlowFiles that were used to create the bundle").build();
     public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure").build();
 
-    private Set<Relationship> relationships;
-    private List<PropertyDescriptor> descriptors;
     private final BinManager binManager = new BinManager();
-
     private final Queue<Bin> readyBins = new LinkedBlockingQueue<>();
 
-    @Override
-    protected final void init(final ProcessorInitializationContext context) {
-
-    	final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_ORIGINAL);
-        relationships.add(REL_FAILURE);
-        Set<Relationship> additionalRelationships = defineAdditionalRelationships();
-        if (additionalRelationships != null) {
-        	relationships.addAll(additionalRelationships);
-        }
-        this.relationships = Collections.unmodifiableSet(relationships);
-
-        final List<PropertyDescriptor> descriptors = new ArrayList<>();
-        descriptors.add(MIN_ENTRIES);
-        descriptors.add(MAX_ENTRIES);
-        descriptors.add(MIN_SIZE);
-        descriptors.add(MAX_SIZE);
-        descriptors.add(MAX_BIN_AGE);
-        descriptors.add(MAX_BIN_COUNT);
-        List<PropertyDescriptor> additionalPropertyDescriptors = this.defineAdditionalPropertyDescriptors();
-        if (additionalPropertyDescriptors != null) {
-        	descriptors.addAll(additionalPropertyDescriptors);
-        }
-
-        this.descriptors = Collections.unmodifiableList(descriptors);
-    }
 
     @OnStopped
     public final void resetState() {
@@ -144,27 +111,6 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
         }
     }
 
-    @Override
-    public final Set<Relationship> getRelationships() {
-        return relationships;
-    }
-
-    @Override
-    protected final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return descriptors;
-    }
-
-    /**
-     * Allows any additional relationships to be defined.
-     * @return Relationships to be added in the init() method
-     */ 
-    protected abstract Set<Relationship> defineAdditionalRelationships();
-    
-    /**
-     * Allows any additional property descriptors to be defined.
-     * @return Properties to be added in the init() method
-     */
-    protected abstract List<PropertyDescriptor> defineAdditionalPropertyDescriptors();
 
 	/**
 	 * Allows general pre-processing of a flow file before it is offered to a
@@ -213,14 +159,14 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
 	 * to Failure and commit their sessions.  If false, the 
 	 * processBins() method will transfer the files to Original and commit
 	 * the sessions
-	 * @throws Exception
-	 *             This will be handled appropriately, and all flow files in the
-	 *             bin will be transferred to failure and the session rolled
-	 *             back
+	 * 
+	 * @throws ProcessException if any problem arises while processing a bin
+	 *             of FlowFiles. All flow files in the
+	 *             bin will be transferred to failure and the ProcessSession provided by
+	 *             the 'session' argument rolled back
 	 */
-	protected abstract boolean processBin(Bin unmodifiableBin,
-			List<FlowFileSessionWrapper> binContents, ProcessContext context,
-			ProcessSession session, ProcessorLog logger) throws Exception;
+	protected abstract boolean processBin(Bin unmodifiableBin, 
+	        List<FlowFileSessionWrapper> binContents, ProcessContext context, ProcessSession session) throws ProcessException;
 
     /**
 	 * Allows additional custom validation to be done. This will be called from
@@ -288,8 +234,8 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
 
     	boolean binAlreadyCommitted = false;
         try {
-        	binAlreadyCommitted = this.processBin(bin, binCopy, context, session, logger);
-        } catch (final Exception e) {
+        	binAlreadyCommitted = this.processBin(bin, binCopy, context, session);
+        } catch (final ProcessException e) {
             logger.error("Failed to process bundle of {} files due to {}", new Object[]{binCopy.size(), e});
 
             for (final FlowFileSessionWrapper wrapper : binCopy) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ec7f7e77/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 73cb5a6..a78bc07 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -46,10 +46,10 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
@@ -198,56 +198,62 @@ public class MergeContent extends BinFiles {
 
     public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
 
+
 	@Override
-	protected Set<Relationship> defineAdditionalRelationships() {
-        final Set<Relationship> relationships = new HashSet<>();
+	public Set<Relationship> getRelationships() {
+	    final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_ORIGINAL);
+        relationships.add(REL_FAILURE);
         relationships.add(REL_MERGED);
-        
         return relationships;
-    }
-
+	}
+	
+	
 	@Override
-	protected List<PropertyDescriptor> defineAdditionalPropertyDescriptors() {
-		final List<PropertyDescriptor> descriptors = new ArrayList<>();
-        descriptors.add(MERGE_STRATEGY);
-        descriptors.add(MERGE_FORMAT);
-        descriptors.add(ATTRIBUTE_STRATEGY);
-        descriptors.add(CORRELATION_ATTRIBUTE_NAME);
+	protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+	    final List<PropertyDescriptor> descriptors = new ArrayList<>();
+	    descriptors.add(MERGE_STRATEGY);
+	    descriptors.add(MERGE_FORMAT);
+	    descriptors.add(ATTRIBUTE_STRATEGY);
+	    descriptors.add(CORRELATION_ATTRIBUTE_NAME);
+        descriptors.add(MIN_ENTRIES);
+        descriptors.add(MAX_ENTRIES);
+        descriptors.add(MIN_SIZE);
+        descriptors.add(MAX_SIZE);
+        descriptors.add(MAX_BIN_AGE);
+        descriptors.add(MAX_BIN_COUNT);
         descriptors.add(HEADER);
         descriptors.add(FOOTER);
         descriptors.add(DEMARCATOR);
         descriptors.add(COMPRESSION_LEVEL);
         descriptors.add(KEEP_PATH);
-        
         return descriptors;
 	}
-
+	
     private byte[] readContent(final String filename) throws IOException {
         return Files.readAllBytes(Paths.get(filename));
     }
 
 
 	@Override
-	protected FlowFile preprocessFlowFile(ProcessContext context,
-			ProcessSession session, FlowFile flowFile) {
-		
+	protected FlowFile preprocessFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
+	    FlowFile processed = flowFile;
         // handle backward compatibility with old segment attributes
-        if (flowFile.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) {
-            flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT_ATTRIBUTE, flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE));
+        if (processed.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && processed.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) {
+            processed = session.putAttribute(processed, FRAGMENT_COUNT_ATTRIBUTE, processed.getAttribute(SEGMENT_COUNT_ATTRIBUTE));
         }
-        if (flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) {
-            flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX_ATTRIBUTE, flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE));
+        if (processed.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null && processed.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) {
+            processed = session.putAttribute(processed, FRAGMENT_INDEX_ATTRIBUTE, processed.getAttribute(SEGMENT_INDEX_ATTRIBUTE));
         }
-        if (flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) {
-            flowFile = session.putAttribute(flowFile, FRAGMENT_ID_ATTRIBUTE, flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE));
+        if (processed.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && processed.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) {
+            processed = session.putAttribute(processed, FRAGMENT_ID_ATTRIBUTE, processed.getAttribute(SEGMENT_ID_ATTRIBUTE));
         }
         
-        return flowFile;
+        return processed;
 	}
 
 	@Override
-	protected String getGroupId(ProcessContext context, FlowFile flowFile) {
-
+	protected String getGroupId(final ProcessContext context, final FlowFile flowFile) {
         final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
         String groupId = (correlationAttributeName == null) ? null : flowFile.getAttribute(correlationAttributeName);
 
@@ -260,16 +266,15 @@ public class MergeContent extends BinFiles {
 	}
 
 	@Override
-	protected void setUpBinManager(BinManager binManager, ProcessContext context) {
+	protected void setUpBinManager(final BinManager binManager, final ProcessContext context) {
 		if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
             binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
         }
 	}
 
 	@Override
-	protected boolean processBin(Bin unmodifiableBin,
-			List<FlowFileSessionWrapper> binCopy, ProcessContext context,
-			ProcessSession session, ProcessorLog logger) throws Exception {
+	protected boolean processBin(final Bin unmodifiableBin, final List<FlowFileSessionWrapper> binCopy, final ProcessContext context,
+			final ProcessSession session) throws ProcessException {
 
         final String mergeFormat = context.getProperty(MERGE_FORMAT).getValue();
         MergeBin merger;
@@ -314,7 +319,7 @@ public class MergeContent extends BinFiles {
             // Fail the flow files and commit them
             if (error != null) {
                 final String binDescription = binCopy.size() <= 10 ? binCopy.toString() : binCopy.size() + " FlowFiles";
-                logger.error(error + "; routing {} to failure", new Object[]{binDescription});
+                getLogger().error(error + "; routing {} to failure", new Object[]{binDescription});
                 for ( final FlowFileSessionWrapper wrapper : binCopy ) {
                     wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE);
                     wrapper.getSession().commit();
@@ -341,7 +346,7 @@ public class MergeContent extends BinFiles {
         bundle = session.putAllAttributes(bundle, bundleAttributes);
 
         final String inputDescription = (binCopy.size() < 10) ? binCopy.toString() : binCopy.size() + " FlowFiles";
-        logger.info("Merged {} into {}", new Object[]{inputDescription, bundle});
+        getLogger().info("Merged {} into {}", new Object[]{inputDescription, bundle});
         session.transfer(bundle, REL_MERGED);
 
         // We haven't committed anything, parent will take care of it