You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/02 21:53:02 UTC

[1/6] incubator-nifi git commit: NIFI-305: Refactoring superclass BinFiles from MergeContent

Repository: incubator-nifi
Updated Branches:
  refs/heads/develop ed8f77160 -> 54e922c8f


NIFI-305: Refactoring superclass BinFiles from MergeContent


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/98afcce0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/98afcce0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/98afcce0

Branch: refs/heads/develop
Commit: 98afcce0dc3672fe04e2130ac72b9a201e17ba1a
Parents: 6b560b9
Author: gresockj <jg...@gmail.com>
Authored: Tue Jan 27 17:32:38 2015 -0500
Committer: gresockj <jg...@gmail.com>
Committed: Tue Jan 27 17:32:38 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/BinFiles.java      | 388 ++++++++++++++++++
 .../nifi/processors/standard/MergeContent.java  | 400 ++++---------------
 2 files changed, 473 insertions(+), 315 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/98afcce0/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
new file mode 100644
index 0000000..7846c7d
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.Bin;
+import org.apache.nifi.processors.standard.util.BinManager;
+import org.apache.nifi.processors.standard.util.FlowFileSessionWrapper;
+
+/**
+ * Base class for MergeContent.
+ *
+ */
+public abstract class BinFiles extends AbstractSessionFactoryProcessor {
+
+    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
+            .name("Minimum Group Size")
+            .description("The minimum size of for the bundle")
+            .required(true)
+            .defaultValue("0 B")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
+            .name("Maximum Group Size")
+            .description("The maximum size for the bundle. If not specified, there is no maximum.")
+            .required(false)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MIN_ENTRIES = new PropertyDescriptor.Builder()
+            .name("Minimum Number of Entries")
+            .description("The minimum number of files to include in a bundle")
+            .required(true)
+            .defaultValue("1")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor MAX_ENTRIES = new PropertyDescriptor.Builder()
+            .name("Maximum Number of Entries")
+            .description("The maximum number of files to include in a bundle. If not specified, there is no maximum.")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder()
+            .name("Maximum number of Bins")
+            .description("Specifies the maximum number of bins that can be held in memory at any one time")
+            .defaultValue("100")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MAX_BIN_AGE = new PropertyDescriptor.Builder()
+            .name("Max Bin Age")
+            .description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> where <duration> is a positive integer and time unit is one of seconds, minutes, hours")
+            .required(false)
+            .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
+            .build();
+
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The FlowFiles that were used to create the bundle").build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure").build();
+
+    private Set<Relationship> relationships;
+    private List<PropertyDescriptor> descriptors;
+    private final BinManager binManager = new BinManager();
+
+    private final Queue<Bin> readyBins = new LinkedBlockingQueue<>();
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+
+    	final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_ORIGINAL);
+        relationships.add(REL_FAILURE);
+        Set<Relationship> additionalRelationships = defineAdditionalRelationships();
+        if (additionalRelationships != null) {
+        	relationships.addAll(additionalRelationships);
+        }
+        this.relationships = Collections.unmodifiableSet(relationships);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(MIN_ENTRIES);
+        descriptors.add(MAX_ENTRIES);
+        descriptors.add(MIN_SIZE);
+        descriptors.add(MAX_SIZE);
+        descriptors.add(MAX_BIN_AGE);
+        descriptors.add(MAX_BIN_COUNT);
+        List<PropertyDescriptor> additionalPropertyDescriptors = this.defineAdditionalPropertyDescriptors();
+        if (additionalPropertyDescriptors != null) {
+        	descriptors.addAll(additionalPropertyDescriptors);
+        }
+
+        this.descriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @OnStopped
+    public void resetState() {
+        binManager.purge();
+
+        Bin bin;
+        while ((bin = readyBins.poll()) != null) {
+            for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
+                wrapper.getSession().rollback();
+            }
+        }
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    /**
+     * Allows any additional relationships to be defined.
+     * @return Relationships to be added in the init() method
+     */ 
+    protected abstract Set<Relationship> defineAdditionalRelationships();
+    
+    /**
+     * Allows any additional property descriptors to be defined.
+     * @return Properties to be added in the init() method
+     */
+    protected abstract List<PropertyDescriptor> defineAdditionalPropertyDescriptors();
+
+	/**
+	 * Allows general pre-processing of a flow file before it is offered to a
+	 * bin. This is called before getGroupId().
+	 * 
+	 * @param context
+	 * @param session
+	 * @param flowFile
+	 * @return The flow file, possibly altered
+	 */
+    protected abstract FlowFile preprocessFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile);
+    
+    /**
+     * Returns a group ID representing a bin.  This allows flow files to be
+     * binned into like groups.
+     * @param context 
+     * @param flowFile
+     * @return The appropriate group ID
+     */
+    protected abstract String getGroupId(final ProcessContext context, final FlowFile flowFile);
+
+    /**
+     * Performs any additional setup of the bin manager.  Called during the
+     * OnScheduled phase.
+     * @param binManager The bin manager
+     * @param context
+     */
+    protected abstract void setUpBinManager(BinManager binManager, ProcessContext context);
+    
+    /**
+	 * Processes a single bin. Implementing class is responsible for committing
+	 * each session
+	 * 
+	 * @param unmodifiableBin
+	 *            A reference to a single bin of flow file/session wrappers
+	 * @param binContents
+	 *            A copy of the contents of the bin
+	 * @param context
+	 *            The context
+	 * @param session
+	 *            The session that created the bin
+	 * @param logger
+	 *            The logger
+	 * @return Return true if the input bin was already committed. E.g., in case of a
+	 * failure, the implementation may choose to transfer all binned files
+	 * to Failure and commit their sessions.  If false, the 
+	 * processBins() method will transfer the files to Original and commit
+	 * the sessions
+	 * @throws Exception
+	 *             This will be handled appropriately, and all flow files in the
+	 *             bin will be transferred to failure and the session rolled
+	 *             back
+	 */
+	protected abstract boolean processBin(Bin unmodifiableBin,
+			List<FlowFileSessionWrapper> binContents, ProcessContext context,
+			ProcessSession session, ProcessorLog logger) throws Exception;
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
+        int binsAdded = binFlowFiles(context, sessionFactory);
+        getLogger().debug("Binned {} FlowFiles", new Object[] {binsAdded});
+        
+        if (!isScheduled()) {
+            return;
+        }
+
+        binsAdded += migrateBins(context);
+
+        final int binsProcessed = processBins(context, sessionFactory);
+        if (binsProcessed == 0 && binsAdded == 0) {
+            context.yield();
+        }
+    }
+
+    private int migrateBins(final ProcessContext context) {
+        int added = 0;
+        for (final Bin bin : binManager.removeReadyBins(true)) {
+            this.readyBins.add(bin);
+            added++;
+        }
+
+        // if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do
+        // this, then we will simply wait for it to expire because we can't get any more FlowFiles into the
+        // bins. So we may as well expire it now.
+        if (added == 0 && binManager.getBinCount() >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
+            final Bin bin = binManager.removeOldestBin();
+            if (bin != null) {
+                added++;
+                this.readyBins.add(bin);
+            }
+        }
+
+        return added;
+    }
+
+    private int processBins(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
+        final Bin bin = readyBins.poll();
+        if (bin == null) {
+            return 0;
+        }
+
+        final List<Bin> bins = new ArrayList<>();
+        bins.add(bin);
+
+        final ProcessorLog logger = getLogger();
+        final ProcessSession session = sessionFactory.createSession();
+
+        final List<FlowFileSessionWrapper> binCopy = new ArrayList<>(bin.getContents());
+
+    	boolean binAlreadyCommitted = false;
+        try {
+        	binAlreadyCommitted = this.processBin(bin, binCopy, context, session, logger);
+        } catch (final Exception e) {
+            logger.error("Failed to process bundle of {} files due to {}", new Object[]{binCopy.size(), e});
+
+            for (final FlowFileSessionWrapper wrapper : binCopy) {
+                wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE);
+                wrapper.getSession().commit();
+            }
+            session.rollback();
+            return 1;
+        }
+
+        // we first commit the bundle's session before the originals' sessions because if we are restarted or crash
+        // between commits, we favor data redundancy over data loss. Since we have no Distributed Transaction capability
+        // across multiple sessions, we cannot guarantee atomicity across the sessions
+        session.commit();
+        // If this bin's session has been committed, move on.
+        if ( !binAlreadyCommitted ) {
+            for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
+                wrapper.getSession().transfer(wrapper.getFlowFile(), REL_ORIGINAL);
+                wrapper.getSession().commit();
+            }
+        }
+
+        return 1;
+    }
+    
+	private int binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
+        int binsAdded = 0;
+        while (binManager.getBinCount() < context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
+            if (!isScheduled()) {
+                return binsAdded;
+            }
+
+            final ProcessSession session = sessionFactory.createSession();
+            FlowFile flowFile = session.get();
+            if (flowFile == null) {
+                return binsAdded;
+            }
+
+            flowFile = this.preprocessFlowFile(context, session, flowFile);
+
+            String groupId = this.getGroupId(context, flowFile);
+
+            final boolean binned = binManager.offer(groupId, flowFile, session);
+
+            // could not be added to a bin -- probably too large by itself, so create a separate bin for just this guy.
+            if (!binned) {
+                Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
+                bin.offer(flowFile, session);
+                this.readyBins.add(bin);
+            }
+
+            binsAdded++;
+        }
+
+        return binsAdded;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) throws IOException {
+        binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue());
+
+        if (context.getProperty(MAX_BIN_AGE).isSet() ) {
+            binManager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue());
+        } else {
+            binManager.setMaxBinAge(Integer.MAX_VALUE);
+        }
+        
+        if ( context.getProperty(MAX_SIZE).isSet() ) {
+            binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue());
+        } else {
+            binManager.setMaximumSize(Long.MAX_VALUE);
+        }
+        
+        binManager.setMinimumEntries(context.getProperty(MIN_ENTRIES).asInteger());
+
+        if ( context.getProperty(MAX_ENTRIES).isSet() ) {
+            binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger().intValue());
+        } else {
+            binManager.setMaximumEntries(Integer.MAX_VALUE);
+        }
+
+        this.setUpBinManager(binManager, context);
+    }
+    
+	@Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext context) {
+        final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context));
+
+        final long minBytes = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
+        final Double maxBytes = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
+
+        if (maxBytes != null && maxBytes.longValue() < minBytes) {
+            problems.add(new ValidationResult.Builder().subject(MIN_SIZE.getName()).input(
+                    context.getProperty(MIN_SIZE).getValue()).valid(false).explanation("Min Size must be less than or equal to Max Size").build());
+        }
+
+        final Long min = context.getProperty(MIN_ENTRIES).asLong();
+        final Long max = context.getProperty(MAX_ENTRIES).asLong();
+
+        if (min != null && max != null) {
+            if (min > max) {
+                problems.add(new ValidationResult.Builder().subject(MIN_ENTRIES.getName()).input(context.getProperty(MIN_ENTRIES).getValue()).valid(false).explanation("Min Entries must be less than or equal to Max Entries").build());
+            }
+        }
+
+        return problems;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/98afcce0/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index f2e4a8d..73cb5a6 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -23,7 +23,6 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -31,59 +30,47 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.NonCloseableOutputStream;
-import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.logging.ProcessorLog;
-import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
-import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.annotation.behavior.SideEffectFree;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
-import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.util.Bin;
 import org.apache.nifi.processors.standard.util.BinManager;
 import org.apache.nifi.processors.standard.util.FlowFileSessionWrapper;
+import org.apache.nifi.stream.io.BufferedInputStream;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.NonCloseableOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.FlowFilePackager;
 import org.apache.nifi.util.FlowFilePackagerV1;
 import org.apache.nifi.util.FlowFilePackagerV2;
 import org.apache.nifi.util.FlowFilePackagerV3;
 import org.apache.nifi.util.ObjectHolder;
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
 
 @SideEffectFree
 @TriggerWhenEmpty
 @Tags({"merge", "content", "correlation", "tar", "zip", "stream", "concatenation", "archive", "flowfile-stream", "flowfile-stream-v3"})
 @CapabilityDescription("Merges a Group of FlowFiles together based on a user-defined strategy and packages them into a single FlowFile. It is recommended that the Processor be configured with only a single incoming connection, as Group of FlowFiles will not be created from FlowFiles in different connections. This processor updates the mime.type attribute as appropriate.")
-public class MergeContent extends AbstractSessionFactoryProcessor {
+public class MergeContent extends BinFiles {
 
     // preferred attributes
     public static final String FRAGMENT_ID_ATTRIBUTE = "fragment.identifier";
@@ -207,160 +194,82 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
             .defaultValue("false")
             .build();
 
-    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
-            .name("Minimum Group Size")
-            .description("The minimum size of for the bundle")
-            .required(true)
-            .defaultValue("0 B")
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .build();
-    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
-            .name("Maximum Group Size")
-            .description("The maximum size for the bundle. If not specified, there is no maximum.")
-            .required(false)
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor MIN_ENTRIES = new PropertyDescriptor.Builder()
-            .name("Minimum Number of Entries")
-            .description("The minimum number of files to include in a bundle")
-            .required(true)
-            .defaultValue("1")
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .build();
-    public static final PropertyDescriptor MAX_ENTRIES = new PropertyDescriptor.Builder()
-            .name("Maximum Number of Entries")
-            .description("The maximum number of files to include in a bundle. If not specified, there is no maximum.")
-            .required(false)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder()
-            .name("Maximum number of Bins")
-            .description("Specifies the maximum number of bins that can be held in memory at any one time")
-            .defaultValue("100")
-            .required(true)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor MAX_BIN_AGE = new PropertyDescriptor.Builder()
-            .name("Max Bin Age")
-            .description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> where <duration> is a positive integer and time unit is one of seconds, minutes, hours")
-            .required(false)
-            .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
-            .build();
-
-    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The FlowFiles that were used to create the bundle").build();
     public static final Relationship REL_MERGED = new Relationship.Builder().name("merged").description("The FlowFile containing the merged content").build();
-    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure").build();
 
     public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
 
-    private Set<Relationship> relationships;
-    private List<PropertyDescriptor> descriptors;
-    private final BinManager binManager = new BinManager();
-
-    private final Queue<Bin> readyBins = new LinkedBlockingQueue<>();
-
-    @Override
-    protected void init(final ProcessorInitializationContext context) {
+	@Override
+	protected Set<Relationship> defineAdditionalRelationships() {
         final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_ORIGINAL);
         relationships.add(REL_MERGED);
-        relationships.add(REL_FAILURE);
-        this.relationships = Collections.unmodifiableSet(relationships);
+        
+        return relationships;
+    }
 
-        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+	@Override
+	protected List<PropertyDescriptor> defineAdditionalPropertyDescriptors() {
+		final List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.add(MERGE_STRATEGY);
         descriptors.add(MERGE_FORMAT);
         descriptors.add(ATTRIBUTE_STRATEGY);
         descriptors.add(CORRELATION_ATTRIBUTE_NAME);
-        descriptors.add(MIN_ENTRIES);
-        descriptors.add(MAX_ENTRIES);
-        descriptors.add(MIN_SIZE);
-        descriptors.add(MAX_SIZE);
-        descriptors.add(MAX_BIN_AGE);
-        descriptors.add(MAX_BIN_COUNT);
         descriptors.add(HEADER);
         descriptors.add(FOOTER);
         descriptors.add(DEMARCATOR);
         descriptors.add(COMPRESSION_LEVEL);
         descriptors.add(KEEP_PATH);
-
-        this.descriptors = Collections.unmodifiableList(descriptors);
-    }
-
-    @OnStopped
-    public void resetState() {
-        binManager.purge();
-
-        Bin bin;
-        while ((bin = readyBins.poll()) != null) {
-            for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
-                wrapper.getSession().rollback();
-            }
-        }
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return relationships;
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        
         return descriptors;
-    }
+	}
 
     private byte[] readContent(final String filename) throws IOException {
         return Files.readAllBytes(Paths.get(filename));
     }
 
-    
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
-        int binsAdded = binFlowFiles(context, sessionFactory);
-        getLogger().debug("Binned {} FlowFiles", new Object[] {binsAdded});
-        
-        if (!isScheduled()) {
-            return;
+
+	@Override
+	protected FlowFile preprocessFlowFile(ProcessContext context,
+			ProcessSession session, FlowFile flowFile) {
+		
+        // handle backward compatibility with old segment attributes
+        if (flowFile.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) {
+            flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT_ATTRIBUTE, flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE));
+        }
+        if (flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) {
+            flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX_ATTRIBUTE, flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE));
         }
+        if (flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) {
+            flowFile = session.putAttribute(flowFile, FRAGMENT_ID_ATTRIBUTE, flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE));
+        }
+        
+        return flowFile;
+	}
 
-        binsAdded += migrateBins(context);
+	@Override
+	protected String getGroupId(ProcessContext context, FlowFile flowFile) {
 
-        final int binsProcessed = processBins(context, sessionFactory);
-        if (binsProcessed == 0 && binsAdded == 0) {
-            context.yield();
-        }
-    }
-    
-
-    private int migrateBins(final ProcessContext context) {
-        int added = 0;
-        for (final Bin bin : binManager.removeReadyBins(true)) {
-            this.readyBins.add(bin);
-            added++;
-        }
-
-        // if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do
-        // this, then we will simply wait for it to expire because we can't get any more FlowFiles into the
-        // bins. So we may as well expire it now.
-        if (added == 0 && binManager.getBinCount() >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
-            final Bin bin = binManager.removeOldestBin();
-            if (bin != null) {
-                added++;
-                this.readyBins.add(bin);
-            }
+        final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
+        String groupId = (correlationAttributeName == null) ? null : flowFile.getAttribute(correlationAttributeName);
+
+        // when MERGE_STRATEGY is Defragment and correlationAttributeName is null then bin by fragment.identifier
+        if (groupId == null && MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
+            groupId = flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
         }
 
-        return added;
-    }
+        return groupId;
+	}
 
-    private int processBins(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
-        final Bin bin = readyBins.poll();
-        if (bin == null) {
-            return 0;
+	@Override
+	protected void setUpBinManager(BinManager binManager, ProcessContext context) {
+		if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
+            binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
         }
+	}
+
+	@Override
+	protected boolean processBin(Bin unmodifiableBin,
+			List<FlowFileSessionWrapper> binCopy, ProcessContext context,
+			ProcessSession session, ProcessorLog logger) throws Exception {
 
         final String mergeFormat = context.getProperty(MERGE_FORMAT).getValue();
         MergeBin merger;
@@ -398,130 +307,45 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
                 break;
         }
 
-        final List<Bin> bins = new ArrayList<>();
-        bins.add(bin);
-
-        final ProcessorLog logger = getLogger();
-        final ProcessSession session = sessionFactory.createSession();
-
-        final Set<Bin> committedBins = new HashSet<>();
-        
-        for (final Bin unmodifiableBin : bins) {
-            final List<FlowFileSessionWrapper> binCopy = new ArrayList<>(unmodifiableBin.getContents());
-
-            if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
-                final String error = getDefragmentValidationError(binCopy);
-                if (error != null) {
-                    final String binDescription = binCopy.size() <= 10 ? binCopy.toString() : binCopy.size() + " FlowFiles";
-                    logger.error(error + "; routing {} to failure", new Object[]{binDescription});
-                    for ( final FlowFileSessionWrapper wrapper : binCopy ) {
-                        wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE);
-                        wrapper.getSession().commit();
-                        committedBins.add(unmodifiableBin);
-                    }
-                    
-                    continue;
-                }
-
-                Collections.sort(binCopy, new FragmentComparator());
-            }
-
-            FlowFile bundle = null;
-            try {
-                bundle = merger.merge(context, session, binCopy);
-
-                // keep the filename, as it is added to the bundle.
-                final String filename = bundle.getAttribute(CoreAttributes.FILENAME.key());
-
-                // merge all of the attributes
-                final Map<String, String> bundleAttributes = attributeStrategy.getMergedAttributes(binCopy);
-                bundleAttributes.put(CoreAttributes.MIME_TYPE.key(), merger.getMergedContentType());
-                // restore the filename of the bundle
-                bundleAttributes.put(CoreAttributes.FILENAME.key(), filename);
-                bundleAttributes.put(MERGE_COUNT_ATTRIBUTE, Integer.toString(binCopy.size()));
-                bundleAttributes.put(MERGE_BIN_AGE_ATTRIBUTE, Long.toString(bin.getBinAge()));
-
-                bundle = session.putAllAttributes(bundle, bundleAttributes);
-
-                final String inputDescription = (binCopy.size() < 10) ? binCopy.toString() : binCopy.size() + " FlowFiles";
-                logger.info("Merged {} into {}", new Object[]{inputDescription, bundle});
-            } catch (final Exception e) {
-                logger.error("Failed to process bundle of {} files due to {}", new Object[]{binCopy.size(), e});
 
-                for (final FlowFileSessionWrapper wrapper : binCopy) {
+        if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
+            final String error = getDefragmentValidationError(binCopy);
+            
+            // Fail the flow files and commit them
+            if (error != null) {
+                final String binDescription = binCopy.size() <= 10 ? binCopy.toString() : binCopy.size() + " FlowFiles";
+                logger.error(error + "; routing {} to failure", new Object[]{binDescription});
+                for ( final FlowFileSessionWrapper wrapper : binCopy ) {
                     wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE);
                     wrapper.getSession().commit();
                 }
-                session.rollback();
-                return 1;
-            }
-            session.transfer(bundle, REL_MERGED);
-        }
-
-        // we first commit the bundle's session before the originals' sessions because if we are restarted or crash
-        // between commits, we favor data redundancy over data loss. Since we have no Distributed Transaction capability
-        // across multiple sessions, we cannot guarantee atomicity across the sessions
-        session.commit();
-        for (final Bin unmodifiableBin : bins) {
-            // If this bin's session has been committed, move on.
-            if ( committedBins.contains(unmodifiableBin) ) {
-                continue;
-            }
-            
-            for (final FlowFileSessionWrapper wrapper : unmodifiableBin.getContents()) {
-                wrapper.getSession().transfer(wrapper.getFlowFile(), REL_ORIGINAL);
-                wrapper.getSession().commit();
+                
+                return true;
             }
+            Collections.sort(binCopy, new FragmentComparator());
         }
 
-        return 1;
-    }
-
-    private int binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
-        int binsAdded = 0;
-        while (binManager.getBinCount() < context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
-            if (!isScheduled()) {
-                return binsAdded;
-            }
-
-            final ProcessSession session = sessionFactory.createSession();
-            FlowFile flowFile = session.get();
-            if (flowFile == null) {
-                return binsAdded;
-            }
+        FlowFile bundle = merger.merge(context, session, binCopy);
 
-            // handle backward compatibility with old segment attributes
-            if (flowFile.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) {
-                flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT_ATTRIBUTE, flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE));
-            }
-            if (flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) {
-                flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX_ATTRIBUTE, flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE));
-            }
-            if (flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) {
-                flowFile = session.putAttribute(flowFile, FRAGMENT_ID_ATTRIBUTE, flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE));
-            }
+        // keep the filename, as it is added to the bundle.
+        final String filename = bundle.getAttribute(CoreAttributes.FILENAME.key());
 
-            final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
-            String groupId = (correlationAttributeName == null) ? null : flowFile.getAttribute(correlationAttributeName);
+        // merge all of the attributes
+        final Map<String, String> bundleAttributes = attributeStrategy.getMergedAttributes(binCopy);
+        bundleAttributes.put(CoreAttributes.MIME_TYPE.key(), merger.getMergedContentType());
+        // restore the filename of the bundle
+        bundleAttributes.put(CoreAttributes.FILENAME.key(), filename);
+        bundleAttributes.put(MERGE_COUNT_ATTRIBUTE, Integer.toString(binCopy.size()));
+        bundleAttributes.put(MERGE_BIN_AGE_ATTRIBUTE, Long.toString(unmodifiableBin.getBinAge()));
 
-            // when MERGE_STRATEGY is Defragment and correlationAttributeName is null then bin by fragment.identifier
-            if (groupId == null && MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
-                groupId = flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
-            }
+        bundle = session.putAllAttributes(bundle, bundleAttributes);
 
-            final boolean binned = binManager.offer(groupId, flowFile, session);
+        final String inputDescription = (binCopy.size() < 10) ? binCopy.toString() : binCopy.size() + " FlowFiles";
+        logger.info("Merged {} into {}", new Object[]{inputDescription, bundle});
+        session.transfer(bundle, REL_MERGED);
 
-            // could not be added to a bin -- probably too large by itself, so create a separate bin for just this guy.
-            if (!binned) {
-                Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
-                bin.offer(flowFile, session);
-                this.readyBins.add(bin);
-            }
-
-            binsAdded++;
-        }
-
-        return binsAdded;
+        // We haven't committed anything, parent will take care of it
+        return false;
     }
 
     private String getDefragmentValidationError(final List<FlowFileSessionWrapper> bin) {
@@ -578,60 +402,6 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
         return NUMBER_PATTERN.matcher(value).matches();
     }
 
-    @OnScheduled
-    public void onScheduled(final ProcessContext context) throws IOException {
-        binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue());
-
-        if (context.getProperty(MAX_BIN_AGE).isSet() ) {
-            binManager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue());
-        } else {
-            binManager.setMaxBinAge(Integer.MAX_VALUE);
-        }
-        
-        if ( context.getProperty(MAX_SIZE).isSet() ) {
-            binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue());
-        } else {
-            binManager.setMaximumSize(Long.MAX_VALUE);
-        }
-        
-        if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
-            binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
-        } else {
-            binManager.setMinimumEntries(context.getProperty(MIN_ENTRIES).asInteger());
-
-            if ( context.getProperty(MAX_ENTRIES).isSet() ) {
-                binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger().intValue());
-            } else {
-                binManager.setMaximumEntries(Integer.MAX_VALUE);
-            }
-        }
-
-    }
-
-    @Override
-    protected Collection<ValidationResult> customValidate(final ValidationContext context) {
-        final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context));
-
-        final long minBytes = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
-        final Double maxBytes = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
-
-        if (maxBytes != null && maxBytes.longValue() < minBytes) {
-            problems.add(new ValidationResult.Builder().subject(MIN_SIZE.getName()).input(
-                    context.getProperty(MIN_SIZE).getValue()).valid(false).explanation("Min Size must be less than or equal to Max Size").build());
-        }
-
-        final Long min = context.getProperty(MIN_ENTRIES).asLong();
-        final Long max = context.getProperty(MAX_ENTRIES).asLong();
-
-        if (min != null && max != null) {
-            if (min > max) {
-                problems.add(new ValidationResult.Builder().subject(MIN_ENTRIES.getName()).input(context.getProperty(MIN_ENTRIES).getValue()).valid(false).explanation("Min Entries must be less than or equal to Max Entries").build());
-            }
-        }
-
-        return problems;
-    }
-
     private class BinaryConcatenationMerge implements MergeBin {
 
         private String mimeType = "application/octet-stream";


[4/6] incubator-nifi git commit: Merge branch 'NIFI-305' of https://github.com/gresockj/incubator-nifi into develop

Posted by ma...@apache.org.
Merge branch 'NIFI-305' of https://github.com/gresockj/incubator-nifi into develop


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/c01dff59
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/c01dff59
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/c01dff59

Branch: refs/heads/develop
Commit: c01dff5922239a28d02d407fb86f27f110aceeab
Parents: bafa945 615794e
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Feb 2 13:23:36 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Feb 2 13:23:36 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/BinFiles.java      | 405 +++++++++++++++++++
 .../nifi/processors/standard/MergeContent.java  | 400 ++++--------------
 2 files changed, 490 insertions(+), 315 deletions(-)
----------------------------------------------------------------------



[5/6] incubator-nifi git commit: NIFI-305: Slight refactorings to provide more flexibility in concrete implementations

Posted by ma...@apache.org.
NIFI-305: Slight refactorings to provide more flexibility in concrete implementations


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/ec7f7e77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/ec7f7e77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/ec7f7e77

Branch: refs/heads/develop
Commit: ec7f7e7717750276a6cbf56edbfcca49b1299fa5
Parents: c01dff5
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Feb 2 13:51:51 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Feb 2 13:51:51 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/BinFiles.java      | 72 +++-----------------
 .../nifi/processors/standard/MergeContent.java  | 69 ++++++++++---------
 2 files changed, 46 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ec7f7e77/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
index 0a65c59..3d7dba1 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
@@ -19,11 +19,8 @@ package org.apache.nifi.processors.standard;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Queue;
-import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -39,7 +36,6 @@ import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
@@ -99,38 +95,9 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
     public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The FlowFiles that were used to create the bundle").build();
     public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure").build();
 
-    private Set<Relationship> relationships;
-    private List<PropertyDescriptor> descriptors;
     private final BinManager binManager = new BinManager();
-
     private final Queue<Bin> readyBins = new LinkedBlockingQueue<>();
 
-    @Override
-    protected final void init(final ProcessorInitializationContext context) {
-
-    	final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_ORIGINAL);
-        relationships.add(REL_FAILURE);
-        Set<Relationship> additionalRelationships = defineAdditionalRelationships();
-        if (additionalRelationships != null) {
-        	relationships.addAll(additionalRelationships);
-        }
-        this.relationships = Collections.unmodifiableSet(relationships);
-
-        final List<PropertyDescriptor> descriptors = new ArrayList<>();
-        descriptors.add(MIN_ENTRIES);
-        descriptors.add(MAX_ENTRIES);
-        descriptors.add(MIN_SIZE);
-        descriptors.add(MAX_SIZE);
-        descriptors.add(MAX_BIN_AGE);
-        descriptors.add(MAX_BIN_COUNT);
-        List<PropertyDescriptor> additionalPropertyDescriptors = this.defineAdditionalPropertyDescriptors();
-        if (additionalPropertyDescriptors != null) {
-        	descriptors.addAll(additionalPropertyDescriptors);
-        }
-
-        this.descriptors = Collections.unmodifiableList(descriptors);
-    }
 
     @OnStopped
     public final void resetState() {
@@ -144,27 +111,6 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
         }
     }
 
-    @Override
-    public final Set<Relationship> getRelationships() {
-        return relationships;
-    }
-
-    @Override
-    protected final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return descriptors;
-    }
-
-    /**
-     * Allows any additional relationships to be defined.
-     * @return Relationships to be added in the init() method
-     */ 
-    protected abstract Set<Relationship> defineAdditionalRelationships();
-    
-    /**
-     * Allows any additional property descriptors to be defined.
-     * @return Properties to be added in the init() method
-     */
-    protected abstract List<PropertyDescriptor> defineAdditionalPropertyDescriptors();
 
 	/**
 	 * Allows general pre-processing of a flow file before it is offered to a
@@ -213,14 +159,14 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
 	 * to Failure and commit their sessions.  If false, the 
 	 * processBins() method will transfer the files to Original and commit
 	 * the sessions
-	 * @throws Exception
-	 *             This will be handled appropriately, and all flow files in the
-	 *             bin will be transferred to failure and the session rolled
-	 *             back
+	 * 
+	 * @throws ProcessException if any problem arises while processing a bin
+	 *             of FlowFiles. All flow files in the
+	 *             bin will be transferred to failure and the ProcessSession provided by
+	 *             the 'session' argument rolled back
 	 */
-	protected abstract boolean processBin(Bin unmodifiableBin,
-			List<FlowFileSessionWrapper> binContents, ProcessContext context,
-			ProcessSession session, ProcessorLog logger) throws Exception;
+	protected abstract boolean processBin(Bin unmodifiableBin, 
+	        List<FlowFileSessionWrapper> binContents, ProcessContext context, ProcessSession session) throws ProcessException;
 
     /**
 	 * Allows additional custom validation to be done. This will be called from
@@ -288,8 +234,8 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
 
     	boolean binAlreadyCommitted = false;
         try {
-        	binAlreadyCommitted = this.processBin(bin, binCopy, context, session, logger);
-        } catch (final Exception e) {
+        	binAlreadyCommitted = this.processBin(bin, binCopy, context, session);
+        } catch (final ProcessException e) {
             logger.error("Failed to process bundle of {} files due to {}", new Object[]{binCopy.size(), e});
 
             for (final FlowFileSessionWrapper wrapper : binCopy) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ec7f7e77/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 73cb5a6..a78bc07 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -46,10 +46,10 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
@@ -198,56 +198,62 @@ public class MergeContent extends BinFiles {
 
     public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
 
+
 	@Override
-	protected Set<Relationship> defineAdditionalRelationships() {
-        final Set<Relationship> relationships = new HashSet<>();
+	public Set<Relationship> getRelationships() {
+	    final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_ORIGINAL);
+        relationships.add(REL_FAILURE);
         relationships.add(REL_MERGED);
-        
         return relationships;
-    }
-
+	}
+	
+	
 	@Override
-	protected List<PropertyDescriptor> defineAdditionalPropertyDescriptors() {
-		final List<PropertyDescriptor> descriptors = new ArrayList<>();
-        descriptors.add(MERGE_STRATEGY);
-        descriptors.add(MERGE_FORMAT);
-        descriptors.add(ATTRIBUTE_STRATEGY);
-        descriptors.add(CORRELATION_ATTRIBUTE_NAME);
+	protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+	    final List<PropertyDescriptor> descriptors = new ArrayList<>();
+	    descriptors.add(MERGE_STRATEGY);
+	    descriptors.add(MERGE_FORMAT);
+	    descriptors.add(ATTRIBUTE_STRATEGY);
+	    descriptors.add(CORRELATION_ATTRIBUTE_NAME);
+        descriptors.add(MIN_ENTRIES);
+        descriptors.add(MAX_ENTRIES);
+        descriptors.add(MIN_SIZE);
+        descriptors.add(MAX_SIZE);
+        descriptors.add(MAX_BIN_AGE);
+        descriptors.add(MAX_BIN_COUNT);
         descriptors.add(HEADER);
         descriptors.add(FOOTER);
         descriptors.add(DEMARCATOR);
         descriptors.add(COMPRESSION_LEVEL);
         descriptors.add(KEEP_PATH);
-        
         return descriptors;
 	}
-
+	
     private byte[] readContent(final String filename) throws IOException {
         return Files.readAllBytes(Paths.get(filename));
     }
 
 
 	@Override
-	protected FlowFile preprocessFlowFile(ProcessContext context,
-			ProcessSession session, FlowFile flowFile) {
-		
+	protected FlowFile preprocessFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
+	    FlowFile processed = flowFile;
         // handle backward compatibility with old segment attributes
-        if (flowFile.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) {
-            flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT_ATTRIBUTE, flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE));
+        if (processed.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && processed.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) {
+            processed = session.putAttribute(processed, FRAGMENT_COUNT_ATTRIBUTE, processed.getAttribute(SEGMENT_COUNT_ATTRIBUTE));
         }
-        if (flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) {
-            flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX_ATTRIBUTE, flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE));
+        if (processed.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null && processed.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) {
+            processed = session.putAttribute(processed, FRAGMENT_INDEX_ATTRIBUTE, processed.getAttribute(SEGMENT_INDEX_ATTRIBUTE));
         }
-        if (flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) {
-            flowFile = session.putAttribute(flowFile, FRAGMENT_ID_ATTRIBUTE, flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE));
+        if (processed.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && processed.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) {
+            processed = session.putAttribute(processed, FRAGMENT_ID_ATTRIBUTE, processed.getAttribute(SEGMENT_ID_ATTRIBUTE));
         }
         
-        return flowFile;
+        return processed;
 	}
 
 	@Override
-	protected String getGroupId(ProcessContext context, FlowFile flowFile) {
-
+	protected String getGroupId(final ProcessContext context, final FlowFile flowFile) {
         final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
         String groupId = (correlationAttributeName == null) ? null : flowFile.getAttribute(correlationAttributeName);
 
@@ -260,16 +266,15 @@ public class MergeContent extends BinFiles {
 	}
 
 	@Override
-	protected void setUpBinManager(BinManager binManager, ProcessContext context) {
+	protected void setUpBinManager(final BinManager binManager, final ProcessContext context) {
 		if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
             binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
         }
 	}
 
 	@Override
-	protected boolean processBin(Bin unmodifiableBin,
-			List<FlowFileSessionWrapper> binCopy, ProcessContext context,
-			ProcessSession session, ProcessorLog logger) throws Exception {
+	protected boolean processBin(final Bin unmodifiableBin, final List<FlowFileSessionWrapper> binCopy, final ProcessContext context,
+			final ProcessSession session) throws ProcessException {
 
         final String mergeFormat = context.getProperty(MERGE_FORMAT).getValue();
         MergeBin merger;
@@ -314,7 +319,7 @@ public class MergeContent extends BinFiles {
             // Fail the flow files and commit them
             if (error != null) {
                 final String binDescription = binCopy.size() <= 10 ? binCopy.toString() : binCopy.size() + " FlowFiles";
-                logger.error(error + "; routing {} to failure", new Object[]{binDescription});
+                getLogger().error(error + "; routing {} to failure", new Object[]{binDescription});
                 for ( final FlowFileSessionWrapper wrapper : binCopy ) {
                     wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE);
                     wrapper.getSession().commit();
@@ -341,7 +346,7 @@ public class MergeContent extends BinFiles {
         bundle = session.putAllAttributes(bundle, bundleAttributes);
 
         final String inputDescription = (binCopy.size() < 10) ? binCopy.toString() : binCopy.size() + " FlowFiles";
-        logger.info("Merged {} into {}", new Object[]{inputDescription, bundle});
+        getLogger().info("Merged {} into {}", new Object[]{inputDescription, bundle});
         session.transfer(bundle, REL_MERGED);
 
         // We haven't committed anything, parent will take care of it


[2/6] incubator-nifi git commit: NIFI-305: Minor documentation update

Posted by ma...@apache.org.
NIFI-305: Minor documentation update


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/ad409034
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/ad409034
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/ad409034

Branch: refs/heads/develop
Commit: ad40903458cd2bbf98284d693a057519d8bbf775
Parents: 98afcce
Author: gresockj <jg...@gmail.com>
Authored: Tue Jan 27 17:51:16 2015 -0500
Committer: gresockj <jg...@gmail.com>
Committed: Tue Jan 27 17:51:16 2015 -0500

----------------------------------------------------------------------
 .../main/java/org/apache/nifi/processors/standard/BinFiles.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ad409034/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
index 7846c7d..b838d51 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
@@ -48,7 +48,7 @@ import org.apache.nifi.processors.standard.util.BinManager;
 import org.apache.nifi.processors.standard.util.FlowFileSessionWrapper;
 
 /**
- * Base class for MergeContent.
+ * Base class for file-binning processors, including MergeContent.
  *
  */
 public abstract class BinFiles extends AbstractSessionFactoryProcessor {


[3/6] incubator-nifi git commit: NIFI-305: Cleaning up for extensibility; final methods

Posted by ma...@apache.org.
NIFI-305: Cleaning up for extensibility; final methods


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/615794e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/615794e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/615794e7

Branch: refs/heads/develop
Commit: 615794e77fafc79ec6027484e584f76695a89573
Parents: ad40903
Author: gresockj <jg...@gmail.com>
Authored: Thu Jan 29 17:11:56 2015 -0500
Committer: gresockj <jg...@gmail.com>
Committed: Thu Jan 29 17:11:56 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/BinFiles.java      | 31 +++++++++++++++-----
 1 file changed, 24 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/615794e7/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
index b838d51..0a65c59 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
@@ -106,7 +106,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
     private final Queue<Bin> readyBins = new LinkedBlockingQueue<>();
 
     @Override
-    protected void init(final ProcessorInitializationContext context) {
+    protected final void init(final ProcessorInitializationContext context) {
 
     	final Set<Relationship> relationships = new HashSet<>();
         relationships.add(REL_ORIGINAL);
@@ -133,7 +133,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
     }
 
     @OnStopped
-    public void resetState() {
+    public final void resetState() {
         binManager.purge();
 
         Bin bin;
@@ -145,12 +145,12 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
     }
 
     @Override
-    public Set<Relationship> getRelationships() {
+    public final Set<Relationship> getRelationships() {
         return relationships;
     }
 
     @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+    protected final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return descriptors;
     }
 
@@ -222,8 +222,20 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
 			List<FlowFileSessionWrapper> binContents, ProcessContext context,
 			ProcessSession session, ProcessorLog logger) throws Exception;
 
+    /**
+	 * Allows additional custom validation to be done. This will be called from
+	 * the parent's customValidation method.
+	 * 
+	 * @param context
+	 *            The context
+	 * @return Validation results indicating problems
+	 */
+    protected Collection<ValidationResult> additionalCustomValidation(final ValidationContext context) {
+    	return new ArrayList<ValidationResult>();
+    }
+	
     @Override
-    public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
+    public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
         int binsAdded = binFlowFiles(context, sessionFactory);
         getLogger().debug("Binned {} FlowFiles", new Object[] {binsAdded});
         
@@ -336,7 +348,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
     }
 
     @OnScheduled
-    public void onScheduled(final ProcessContext context) throws IOException {
+    public final void onScheduled(final ProcessContext context) throws IOException {
         binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue());
 
         if (context.getProperty(MAX_BIN_AGE).isSet() ) {
@@ -363,7 +375,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
     }
     
 	@Override
-    protected Collection<ValidationResult> customValidate(final ValidationContext context) {
+    protected final Collection<ValidationResult> customValidate(final ValidationContext context) {
         final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context));
 
         final long minBytes = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
@@ -382,6 +394,11 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
                 problems.add(new ValidationResult.Builder().subject(MIN_ENTRIES.getName()).input(context.getProperty(MIN_ENTRIES).getValue()).valid(false).explanation("Min Entries must be less than or equal to Max Entries").build());
             }
         }
+        
+        Collection<ValidationResult> otherProblems = this.additionalCustomValidation(context);
+        if (otherProblems != null) {
+        	problems.addAll(otherProblems);
+        }
 
         return problems;
     }


[6/6] incubator-nifi git commit: Merge branch 'develop' of http://git-wip-us.apache.org/repos/asf/incubator-nifi into develop

Posted by ma...@apache.org.
Merge branch 'develop' of http://git-wip-us.apache.org/repos/asf/incubator-nifi into develop


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/54e922c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/54e922c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/54e922c8

Branch: refs/heads/develop
Commit: 54e922c8f5740cab723d16b66d12b2a753c242ed
Parents: ec7f7e7 ed8f771
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Feb 2 15:52:26 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Feb 2 15:52:26 2015 -0500

----------------------------------------------------------------------
 nifi-nar-maven-plugin/pom.xml                   |   2 +-
 nifi/nifi-api/pom.xml                           |   4 +-
 nifi/nifi-assembly/pom.xml                      |   6 +-
 nifi/nifi-bootstrap/pom.xml                     |   2 +-
 .../nifi-data-provenance-utils/pom.xml          |   4 +-
 .../nifi-expression-language/pom.xml            |   4 +-
 .../nifi-commons/nifi-flowfile-packager/pom.xml |   4 +-
 nifi/nifi-commons/nifi-logging-utils/pom.xml    |   4 +-
 .../nifi-processor-utilities/pom.xml            |   4 +-
 nifi/nifi-commons/nifi-properties/pom.xml       |   4 +-
 nifi/nifi-commons/nifi-security-utils/pom.xml   |   4 +-
 nifi/nifi-commons/nifi-socket-utils/pom.xml     |   4 +-
 nifi/nifi-commons/nifi-utils/pom.xml            |   4 +-
 nifi/nifi-commons/nifi-web-utils/pom.xml        |   4 +-
 nifi/nifi-commons/nifi-write-ahead-log/pom.xml  |   4 +-
 nifi/nifi-commons/pom.xml                       |   5 +-
 nifi/nifi-docs/pom.xml                          |   6 +-
 .../nifi-processor-bundle-archetype/pom.xml     |   2 +-
 nifi/nifi-maven-archetypes/pom.xml              |   2 +-
 nifi/nifi-mock/pom.xml                          |   4 +-
 .../nifi-framework-nar/pom.xml                  |   4 +-
 .../nifi-framework/nifi-administration/pom.xml  |   4 +-
 .../nifi-framework/nifi-client-dto/pom.xml      |   2 +-
 .../nifi-cluster-authorization-provider/pom.xml |   2 +-
 .../nifi-cluster-protocol/pom.xml               |   2 +-
 .../nifi-framework/nifi-cluster-web/pom.xml     |   2 +-
 .../nifi-framework/nifi-cluster/pom.xml         |   2 +-
 .../nifi-file-authorization-provider/pom.xml    |   4 +-
 .../nifi-framework-core-api/pom.xml             |   4 +-
 .../nifi-framework/nifi-framework-core/pom.xml  |   2 +-
 .../nifi-framework/nifi-nar-utils/pom.xml       |   2 +-
 .../nifi-framework/nifi-resources/pom.xml       |   2 +-
 .../src/main/resources/conf/logback.xml         |   4 +-
 .../nifi-framework/nifi-runtime/pom.xml         |   2 +-
 .../nifi-framework/nifi-security/pom.xml        |   4 +-
 .../nifi-framework/nifi-site-to-site/pom.xml    |   4 +-
 .../nifi-framework/nifi-user-actions/pom.xml    |   4 +-
 .../nifi-web/nifi-custom-ui-utilities/pom.xml   |   2 +-
 .../nifi-framework/nifi-web/nifi-jetty/pom.xml  |   2 +-
 .../nifi-web/nifi-web-api/pom.xml               |   4 +-
 .../nifi/web/controller/ControllerFacade.java   |  15 +-
 .../nifi-web/nifi-web-docs/pom.xml              |   2 +-
 .../nifi-web/nifi-web-error/pom.xml             |   2 +-
 .../nifi-web-optimistic-locking/pom.xml         |   2 +-
 .../nifi-web/nifi-web-security/pom.xml          |   2 +-
 .../nifi-framework/nifi-web/nifi-web-ui/pom.xml |   2 +-
 .../src/main/webapp/css/provenance.css          |   5 +
 .../webapp/js/nf/cluster/nf-cluster-table.js    | 319 ++++++-----
 .../webapp/js/nf/counters/nf-counters-table.js  |  63 ++-
 .../webapp/js/nf/history/nf-history-table.js    | 147 +++---
 .../js/nf/provenance/nf-provenance-table.js     | 103 ++--
 .../webapp/js/nf/summary/nf-summary-table.js    | 523 +++++++------------
 .../src/main/webapp/js/nf/summary/nf-summary.js |   6 -
 .../js/nf/templates/nf-templates-table.js       |  73 ++-
 .../main/webapp/js/nf/users/nf-users-table.js   | 440 ++++++++--------
 .../nifi-framework/nifi-web/pom.xml             |  10 +-
 .../nifi-framework/pom.xml                      |   4 +-
 .../nifi-framework-bundle/pom.xml               |  36 +-
 .../nifi-hadoop-bundle/nifi-hadoop-nar/pom.xml  |   4 +-
 .../nifi-hdfs-processors/pom.xml                |   2 +-
 .../nifi-nar-bundles/nifi-hadoop-bundle/pom.xml |   6 +-
 .../nifi-hadoop-libraries-nar/pom.xml           |   4 +-
 .../nifi-hadoop-libraries-bundle/pom.xml        |   4 +-
 nifi/nifi-nar-bundles/nifi-jetty-bundle/pom.xml |   4 +-
 .../nifi-kafka-bundle/nifi-kafka-nar/pom.xml    |   4 +-
 .../nifi-kafka-processors/pom.xml               |   2 +-
 nifi/nifi-nar-bundles/nifi-kafka-bundle/pom.xml |   4 +-
 .../pom.xml                                     |   2 +-
 .../nifi-provenance-repository-nar/pom.xml      |   4 +-
 .../nifi-volatile-provenance-repository/pom.xml |   2 +-
 .../nifi-provenance-repository-bundle/pom.xml   |   8 +-
 .../nifi-standard-nar/pom.xml                   |   4 +-
 .../nifi-standard-prioritizers/pom.xml          |   2 +-
 .../nifi-standard-processors/pom.xml            |   2 +-
 .../nifi-standard-reporting-tasks/pom.xml       |   2 +-
 .../nifi-standard-bundle/pom.xml                |  10 +-
 .../pom.xml                                     |   2 +-
 .../pom.xml                                     |   2 +-
 .../nifi-distributed-cache-protocol/pom.xml     |   2 +-
 .../nifi-distributed-cache-server/pom.xml       |   2 +-
 .../nifi-distributed-cache-services-nar/pom.xml |   4 +-
 .../pom.xml                                     |   4 +-
 .../nifi-load-distribution-service-api/pom.xml  |   4 +-
 .../nifi-ssl-context-nar/pom.xml                |   4 +-
 .../nifi-ssl-context-service/pom.xml            |   2 +-
 .../nifi-ssl-context-bundle/pom.xml             |   4 +-
 .../nifi-ssl-context-service-api/pom.xml        |   2 +-
 .../nifi-standard-services-api-nar/pom.xml      |   4 +-
 .../nifi-standard-services/pom.xml              |   4 +-
 .../nifi-update-attribute-model/pom.xml         |   2 +-
 .../nifi-update-attribute-nar/pom.xml           |   4 +-
 .../nifi-update-attribute-processor/pom.xml     |   2 +-
 .../nifi-update-attribute-ui/pom.xml            |   2 +-
 .../update/attributes/api/RuleResource.java     |  16 +-
 .../nifi-update-attribute-bundle/pom.xml        |  10 +-
 nifi/nifi-nar-bundles/pom.xml                   |  31 +-
 nifi/pom.xml                                    |  64 +--
 97 files changed, 990 insertions(+), 1140 deletions(-)
----------------------------------------------------------------------