You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/02 21:53:02 UTC
[1/6] incubator-nifi git commit: NIFI-305: Refactoring superclass
BinFiles from MergeContent
Repository: incubator-nifi
Updated Branches:
refs/heads/develop ed8f77160 -> 54e922c8f
NIFI-305: Refactoring superclass BinFiles from MergeContent
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/98afcce0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/98afcce0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/98afcce0
Branch: refs/heads/develop
Commit: 98afcce0dc3672fe04e2130ac72b9a201e17ba1a
Parents: 6b560b9
Author: gresockj <jg...@gmail.com>
Authored: Tue Jan 27 17:32:38 2015 -0500
Committer: gresockj <jg...@gmail.com>
Committed: Tue Jan 27 17:32:38 2015 -0500
----------------------------------------------------------------------
.../nifi/processors/standard/BinFiles.java | 388 ++++++++++++++++++
.../nifi/processors/standard/MergeContent.java | 400 ++++---------------
2 files changed, 473 insertions(+), 315 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/98afcce0/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
new file mode 100644
index 0000000..7846c7d
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.Bin;
+import org.apache.nifi.processors.standard.util.BinManager;
+import org.apache.nifi.processors.standard.util.FlowFileSessionWrapper;
+
+/**
+ * Base class for MergeContent.
+ *
+ */
+public abstract class BinFiles extends AbstractSessionFactoryProcessor {
+
+ public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
+ .name("Minimum Group Size")
+ .description("The minimum size of for the bundle")
+ .required(true)
+ .defaultValue("0 B")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
+ .name("Maximum Group Size")
+ .description("The maximum size for the bundle. If not specified, there is no maximum.")
+ .required(false)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor MIN_ENTRIES = new PropertyDescriptor.Builder()
+ .name("Minimum Number of Entries")
+ .description("The minimum number of files to include in a bundle")
+ .required(true)
+ .defaultValue("1")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor MAX_ENTRIES = new PropertyDescriptor.Builder()
+ .name("Maximum Number of Entries")
+ .description("The maximum number of files to include in a bundle. If not specified, there is no maximum.")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder()
+ .name("Maximum number of Bins")
+ .description("Specifies the maximum number of bins that can be held in memory at any one time")
+ .defaultValue("100")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor MAX_BIN_AGE = new PropertyDescriptor.Builder()
+ .name("Max Bin Age")
+ .description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> where <duration> is a positive integer and time unit is one of seconds, minutes, hours")
+ .required(false)
+ .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
+ .build();
+
+ public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The FlowFiles that were used to create the bundle").build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure").build();
+
+ private Set<Relationship> relationships;
+ private List<PropertyDescriptor> descriptors;
+ private final BinManager binManager = new BinManager();
+
+ private final Queue<Bin> readyBins = new LinkedBlockingQueue<>();
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_ORIGINAL);
+ relationships.add(REL_FAILURE);
+ Set<Relationship> additionalRelationships = defineAdditionalRelationships();
+ if (additionalRelationships != null) {
+ relationships.addAll(additionalRelationships);
+ }
+ this.relationships = Collections.unmodifiableSet(relationships);
+
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(MIN_ENTRIES);
+ descriptors.add(MAX_ENTRIES);
+ descriptors.add(MIN_SIZE);
+ descriptors.add(MAX_SIZE);
+ descriptors.add(MAX_BIN_AGE);
+ descriptors.add(MAX_BIN_COUNT);
+ List<PropertyDescriptor> additionalPropertyDescriptors = this.defineAdditionalPropertyDescriptors();
+ if (additionalPropertyDescriptors != null) {
+ descriptors.addAll(additionalPropertyDescriptors);
+ }
+
+ this.descriptors = Collections.unmodifiableList(descriptors);
+ }
+
+ @OnStopped
+ public void resetState() {
+ binManager.purge();
+
+ Bin bin;
+ while ((bin = readyBins.poll()) != null) {
+ for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
+ wrapper.getSession().rollback();
+ }
+ }
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return descriptors;
+ }
+
+ /**
+ * Allows any additional relationships to be defined.
+ * @return Relationships to be added in the init() method
+ */
+ protected abstract Set<Relationship> defineAdditionalRelationships();
+
+ /**
+ * Allows any additional property descriptors to be defined.
+ * @return Properties to be added in the init() method
+ */
+ protected abstract List<PropertyDescriptor> defineAdditionalPropertyDescriptors();
+
+ /**
+ * Allows general pre-processing of a flow file before it is offered to a
+ * bin. This is called before getGroupId().
+ *
+ * @param context
+ * @param session
+ * @param flowFile
+ * @return The flow file, possibly altered
+ */
+ protected abstract FlowFile preprocessFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile);
+
+ /**
+ * Returns a group ID representing a bin. This allows flow files to be
+ * binned into like groups.
+ * @param context
+ * @param flowFile
+ * @return The appropriate group ID
+ */
+ protected abstract String getGroupId(final ProcessContext context, final FlowFile flowFile);
+
+ /**
+ * Performs any additional setup of the bin manager. Called during the
+ * OnScheduled phase.
+ * @param binManager The bin manager
+ * @param context
+ */
+ protected abstract void setUpBinManager(BinManager binManager, ProcessContext context);
+
+ /**
+ * Processes a single bin. Implementing class is responsible for committing
+ * each session
+ *
+ * @param unmodifiableBin
+ * A reference to a single bin of flow file/session wrappers
+ * @param binContents
+ * A copy of the contents of the bin
+ * @param context
+ * The context
+ * @param session
+ * The session that created the bin
+ * @param logger
+ * The logger
+ * @return Return true if the input bin was already committed. E.g., in case of a
+ * failure, the implementation may choose to transfer all binned files
+ * to Failure and commit their sessions. If false, the
+ * processBins() method will transfer the files to Original and commit
+ * the sessions
+ * @throws Exception
+ * This will be handled appropriately, and all flow files in the
+ * bin will be transferred to failure and the session rolled
+ * back
+ */
+ protected abstract boolean processBin(Bin unmodifiableBin,
+ List<FlowFileSessionWrapper> binContents, ProcessContext context,
+ ProcessSession session, ProcessorLog logger) throws Exception;
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
+ int binsAdded = binFlowFiles(context, sessionFactory);
+ getLogger().debug("Binned {} FlowFiles", new Object[] {binsAdded});
+
+ if (!isScheduled()) {
+ return;
+ }
+
+ binsAdded += migrateBins(context);
+
+ final int binsProcessed = processBins(context, sessionFactory);
+ if (binsProcessed == 0 && binsAdded == 0) {
+ context.yield();
+ }
+ }
+
+ private int migrateBins(final ProcessContext context) {
+ int added = 0;
+ for (final Bin bin : binManager.removeReadyBins(true)) {
+ this.readyBins.add(bin);
+ added++;
+ }
+
+ // if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do
+ // this, then we will simply wait for it to expire because we can't get any more FlowFiles into the
+ // bins. So we may as well expire it now.
+ if (added == 0 && binManager.getBinCount() >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
+ final Bin bin = binManager.removeOldestBin();
+ if (bin != null) {
+ added++;
+ this.readyBins.add(bin);
+ }
+ }
+
+ return added;
+ }
+
+ private int processBins(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
+ final Bin bin = readyBins.poll();
+ if (bin == null) {
+ return 0;
+ }
+
+ final List<Bin> bins = new ArrayList<>();
+ bins.add(bin);
+
+ final ProcessorLog logger = getLogger();
+ final ProcessSession session = sessionFactory.createSession();
+
+ final List<FlowFileSessionWrapper> binCopy = new ArrayList<>(bin.getContents());
+
+ boolean binAlreadyCommitted = false;
+ try {
+ binAlreadyCommitted = this.processBin(bin, binCopy, context, session, logger);
+ } catch (final Exception e) {
+ logger.error("Failed to process bundle of {} files due to {}", new Object[]{binCopy.size(), e});
+
+ for (final FlowFileSessionWrapper wrapper : binCopy) {
+ wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE);
+ wrapper.getSession().commit();
+ }
+ session.rollback();
+ return 1;
+ }
+
+ // we first commit the bundle's session before the originals' sessions because if we are restarted or crash
+ // between commits, we favor data redundancy over data loss. Since we have no Distributed Transaction capability
+ // across multiple sessions, we cannot guarantee atomicity across the sessions
+ session.commit();
+ // If this bin's session has been committed, move on.
+ if ( !binAlreadyCommitted ) {
+ for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
+ wrapper.getSession().transfer(wrapper.getFlowFile(), REL_ORIGINAL);
+ wrapper.getSession().commit();
+ }
+ }
+
+ return 1;
+ }
+
+ private int binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
+ int binsAdded = 0;
+ while (binManager.getBinCount() < context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
+ if (!isScheduled()) {
+ return binsAdded;
+ }
+
+ final ProcessSession session = sessionFactory.createSession();
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return binsAdded;
+ }
+
+ flowFile = this.preprocessFlowFile(context, session, flowFile);
+
+ String groupId = this.getGroupId(context, flowFile);
+
+ final boolean binned = binManager.offer(groupId, flowFile, session);
+
+ // could not be added to a bin -- probably too large by itself, so create a separate bin for just this guy.
+ if (!binned) {
+ Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
+ bin.offer(flowFile, session);
+ this.readyBins.add(bin);
+ }
+
+ binsAdded++;
+ }
+
+ return binsAdded;
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) throws IOException {
+ binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue());
+
+ if (context.getProperty(MAX_BIN_AGE).isSet() ) {
+ binManager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue());
+ } else {
+ binManager.setMaxBinAge(Integer.MAX_VALUE);
+ }
+
+ if ( context.getProperty(MAX_SIZE).isSet() ) {
+ binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue());
+ } else {
+ binManager.setMaximumSize(Long.MAX_VALUE);
+ }
+
+ binManager.setMinimumEntries(context.getProperty(MIN_ENTRIES).asInteger());
+
+ if ( context.getProperty(MAX_ENTRIES).isSet() ) {
+ binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger().intValue());
+ } else {
+ binManager.setMaximumEntries(Integer.MAX_VALUE);
+ }
+
+ this.setUpBinManager(binManager, context);
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(final ValidationContext context) {
+ final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context));
+
+ final long minBytes = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
+ final Double maxBytes = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
+
+ if (maxBytes != null && maxBytes.longValue() < minBytes) {
+ problems.add(new ValidationResult.Builder().subject(MIN_SIZE.getName()).input(
+ context.getProperty(MIN_SIZE).getValue()).valid(false).explanation("Min Size must be less than or equal to Max Size").build());
+ }
+
+ final Long min = context.getProperty(MIN_ENTRIES).asLong();
+ final Long max = context.getProperty(MAX_ENTRIES).asLong();
+
+ if (min != null && max != null) {
+ if (min > max) {
+ problems.add(new ValidationResult.Builder().subject(MIN_ENTRIES.getName()).input(context.getProperty(MIN_ENTRIES).getValue()).valid(false).explanation("Min Entries must be less than or equal to Max Entries").build());
+ }
+ }
+
+ return problems;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/98afcce0/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index f2e4a8d..73cb5a6 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -23,7 +23,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -31,59 +30,47 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.NonCloseableOutputStream;
-import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.logging.ProcessorLog;
-import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
-import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.annotation.behavior.SideEffectFree;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
-import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.Bin;
import org.apache.nifi.processors.standard.util.BinManager;
import org.apache.nifi.processors.standard.util.FlowFileSessionWrapper;
+import org.apache.nifi.stream.io.BufferedInputStream;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.NonCloseableOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FlowFilePackager;
import org.apache.nifi.util.FlowFilePackagerV1;
import org.apache.nifi.util.FlowFilePackagerV2;
import org.apache.nifi.util.FlowFilePackagerV3;
import org.apache.nifi.util.ObjectHolder;
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
@SideEffectFree
@TriggerWhenEmpty
@Tags({"merge", "content", "correlation", "tar", "zip", "stream", "concatenation", "archive", "flowfile-stream", "flowfile-stream-v3"})
@CapabilityDescription("Merges a Group of FlowFiles together based on a user-defined strategy and packages them into a single FlowFile. It is recommended that the Processor be configured with only a single incoming connection, as Group of FlowFiles will not be created from FlowFiles in different connections. This processor updates the mime.type attribute as appropriate.")
-public class MergeContent extends AbstractSessionFactoryProcessor {
+public class MergeContent extends BinFiles {
// preferred attributes
public static final String FRAGMENT_ID_ATTRIBUTE = "fragment.identifier";
@@ -207,160 +194,82 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
.defaultValue("false")
.build();
- public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
- .name("Minimum Group Size")
- .description("The minimum size of for the bundle")
- .required(true)
- .defaultValue("0 B")
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .build();
- public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
- .name("Maximum Group Size")
- .description("The maximum size for the bundle. If not specified, there is no maximum.")
- .required(false)
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor MIN_ENTRIES = new PropertyDescriptor.Builder()
- .name("Minimum Number of Entries")
- .description("The minimum number of files to include in a bundle")
- .required(true)
- .defaultValue("1")
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .build();
- public static final PropertyDescriptor MAX_ENTRIES = new PropertyDescriptor.Builder()
- .name("Maximum Number of Entries")
- .description("The maximum number of files to include in a bundle. If not specified, there is no maximum.")
- .required(false)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder()
- .name("Maximum number of Bins")
- .description("Specifies the maximum number of bins that can be held in memory at any one time")
- .defaultValue("100")
- .required(true)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor MAX_BIN_AGE = new PropertyDescriptor.Builder()
- .name("Max Bin Age")
- .description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> where <duration> is a positive integer and time unit is one of seconds, minutes, hours")
- .required(false)
- .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
- .build();
-
- public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The FlowFiles that were used to create the bundle").build();
public static final Relationship REL_MERGED = new Relationship.Builder().name("merged").description("The FlowFile containing the merged content").build();
- public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure").build();
public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
- private Set<Relationship> relationships;
- private List<PropertyDescriptor> descriptors;
- private final BinManager binManager = new BinManager();
-
- private final Queue<Bin> readyBins = new LinkedBlockingQueue<>();
-
- @Override
- protected void init(final ProcessorInitializationContext context) {
+ @Override
+ protected Set<Relationship> defineAdditionalRelationships() {
final Set<Relationship> relationships = new HashSet<>();
- relationships.add(REL_ORIGINAL);
relationships.add(REL_MERGED);
- relationships.add(REL_FAILURE);
- this.relationships = Collections.unmodifiableSet(relationships);
+
+ return relationships;
+ }
- final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ @Override
+ protected List<PropertyDescriptor> defineAdditionalPropertyDescriptors() {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(MERGE_STRATEGY);
descriptors.add(MERGE_FORMAT);
descriptors.add(ATTRIBUTE_STRATEGY);
descriptors.add(CORRELATION_ATTRIBUTE_NAME);
- descriptors.add(MIN_ENTRIES);
- descriptors.add(MAX_ENTRIES);
- descriptors.add(MIN_SIZE);
- descriptors.add(MAX_SIZE);
- descriptors.add(MAX_BIN_AGE);
- descriptors.add(MAX_BIN_COUNT);
descriptors.add(HEADER);
descriptors.add(FOOTER);
descriptors.add(DEMARCATOR);
descriptors.add(COMPRESSION_LEVEL);
descriptors.add(KEEP_PATH);
-
- this.descriptors = Collections.unmodifiableList(descriptors);
- }
-
- @OnStopped
- public void resetState() {
- binManager.purge();
-
- Bin bin;
- while ((bin = readyBins.poll()) != null) {
- for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
- wrapper.getSession().rollback();
- }
- }
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return relationships;
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+
return descriptors;
- }
+ }
private byte[] readContent(final String filename) throws IOException {
return Files.readAllBytes(Paths.get(filename));
}
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
- int binsAdded = binFlowFiles(context, sessionFactory);
- getLogger().debug("Binned {} FlowFiles", new Object[] {binsAdded});
-
- if (!isScheduled()) {
- return;
+
+ @Override
+ protected FlowFile preprocessFlowFile(ProcessContext context,
+ ProcessSession session, FlowFile flowFile) {
+
+ // handle backward compatibility with old segment attributes
+ if (flowFile.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) {
+ flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT_ATTRIBUTE, flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE));
+ }
+ if (flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) {
+ flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX_ATTRIBUTE, flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE));
}
+ if (flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) {
+ flowFile = session.putAttribute(flowFile, FRAGMENT_ID_ATTRIBUTE, flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE));
+ }
+
+ return flowFile;
+ }
- binsAdded += migrateBins(context);
+ @Override
+ protected String getGroupId(ProcessContext context, FlowFile flowFile) {
- final int binsProcessed = processBins(context, sessionFactory);
- if (binsProcessed == 0 && binsAdded == 0) {
- context.yield();
- }
- }
-
-
- private int migrateBins(final ProcessContext context) {
- int added = 0;
- for (final Bin bin : binManager.removeReadyBins(true)) {
- this.readyBins.add(bin);
- added++;
- }
-
- // if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do
- // this, then we will simply wait for it to expire because we can't get any more FlowFiles into the
- // bins. So we may as well expire it now.
- if (added == 0 && binManager.getBinCount() >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
- final Bin bin = binManager.removeOldestBin();
- if (bin != null) {
- added++;
- this.readyBins.add(bin);
- }
+ final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
+ String groupId = (correlationAttributeName == null) ? null : flowFile.getAttribute(correlationAttributeName);
+
+ // when MERGE_STRATEGY is Defragment and correlationAttributeName is null then bin by fragment.identifier
+ if (groupId == null && MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
+ groupId = flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
}
- return added;
- }
+ return groupId;
+ }
- private int processBins(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
- final Bin bin = readyBins.poll();
- if (bin == null) {
- return 0;
+ @Override
+ protected void setUpBinManager(BinManager binManager, ProcessContext context) {
+ if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
+ binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
}
+ }
+
+ @Override
+ protected boolean processBin(Bin unmodifiableBin,
+ List<FlowFileSessionWrapper> binCopy, ProcessContext context,
+ ProcessSession session, ProcessorLog logger) throws Exception {
final String mergeFormat = context.getProperty(MERGE_FORMAT).getValue();
MergeBin merger;
@@ -398,130 +307,45 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
break;
}
- final List<Bin> bins = new ArrayList<>();
- bins.add(bin);
-
- final ProcessorLog logger = getLogger();
- final ProcessSession session = sessionFactory.createSession();
-
- final Set<Bin> committedBins = new HashSet<>();
-
- for (final Bin unmodifiableBin : bins) {
- final List<FlowFileSessionWrapper> binCopy = new ArrayList<>(unmodifiableBin.getContents());
-
- if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
- final String error = getDefragmentValidationError(binCopy);
- if (error != null) {
- final String binDescription = binCopy.size() <= 10 ? binCopy.toString() : binCopy.size() + " FlowFiles";
- logger.error(error + "; routing {} to failure", new Object[]{binDescription});
- for ( final FlowFileSessionWrapper wrapper : binCopy ) {
- wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE);
- wrapper.getSession().commit();
- committedBins.add(unmodifiableBin);
- }
-
- continue;
- }
-
- Collections.sort(binCopy, new FragmentComparator());
- }
-
- FlowFile bundle = null;
- try {
- bundle = merger.merge(context, session, binCopy);
-
- // keep the filename, as it is added to the bundle.
- final String filename = bundle.getAttribute(CoreAttributes.FILENAME.key());
-
- // merge all of the attributes
- final Map<String, String> bundleAttributes = attributeStrategy.getMergedAttributes(binCopy);
- bundleAttributes.put(CoreAttributes.MIME_TYPE.key(), merger.getMergedContentType());
- // restore the filename of the bundle
- bundleAttributes.put(CoreAttributes.FILENAME.key(), filename);
- bundleAttributes.put(MERGE_COUNT_ATTRIBUTE, Integer.toString(binCopy.size()));
- bundleAttributes.put(MERGE_BIN_AGE_ATTRIBUTE, Long.toString(bin.getBinAge()));
-
- bundle = session.putAllAttributes(bundle, bundleAttributes);
-
- final String inputDescription = (binCopy.size() < 10) ? binCopy.toString() : binCopy.size() + " FlowFiles";
- logger.info("Merged {} into {}", new Object[]{inputDescription, bundle});
- } catch (final Exception e) {
- logger.error("Failed to process bundle of {} files due to {}", new Object[]{binCopy.size(), e});
- for (final FlowFileSessionWrapper wrapper : binCopy) {
+ if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
+ final String error = getDefragmentValidationError(binCopy);
+
+ // Fail the flow files and commit them
+ if (error != null) {
+ final String binDescription = binCopy.size() <= 10 ? binCopy.toString() : binCopy.size() + " FlowFiles";
+ logger.error(error + "; routing {} to failure", new Object[]{binDescription});
+ for ( final FlowFileSessionWrapper wrapper : binCopy ) {
wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE);
wrapper.getSession().commit();
}
- session.rollback();
- return 1;
- }
- session.transfer(bundle, REL_MERGED);
- }
-
- // we first commit the bundle's session before the originals' sessions because if we are restarted or crash
- // between commits, we favor data redundancy over data loss. Since we have no Distributed Transaction capability
- // across multiple sessions, we cannot guarantee atomicity across the sessions
- session.commit();
- for (final Bin unmodifiableBin : bins) {
- // If this bin's session has been committed, move on.
- if ( committedBins.contains(unmodifiableBin) ) {
- continue;
- }
-
- for (final FlowFileSessionWrapper wrapper : unmodifiableBin.getContents()) {
- wrapper.getSession().transfer(wrapper.getFlowFile(), REL_ORIGINAL);
- wrapper.getSession().commit();
+
+ return true;
}
+ Collections.sort(binCopy, new FragmentComparator());
}
- return 1;
- }
-
- private int binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
- int binsAdded = 0;
- while (binManager.getBinCount() < context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
- if (!isScheduled()) {
- return binsAdded;
- }
-
- final ProcessSession session = sessionFactory.createSession();
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return binsAdded;
- }
+ FlowFile bundle = merger.merge(context, session, binCopy);
- // handle backward compatibility with old segment attributes
- if (flowFile.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) {
- flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT_ATTRIBUTE, flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE));
- }
- if (flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) {
- flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX_ATTRIBUTE, flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE));
- }
- if (flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) {
- flowFile = session.putAttribute(flowFile, FRAGMENT_ID_ATTRIBUTE, flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE));
- }
+ // keep the filename, as it is added to the bundle.
+ final String filename = bundle.getAttribute(CoreAttributes.FILENAME.key());
- final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
- String groupId = (correlationAttributeName == null) ? null : flowFile.getAttribute(correlationAttributeName);
+ // merge all of the attributes
+ final Map<String, String> bundleAttributes = attributeStrategy.getMergedAttributes(binCopy);
+ bundleAttributes.put(CoreAttributes.MIME_TYPE.key(), merger.getMergedContentType());
+ // restore the filename of the bundle
+ bundleAttributes.put(CoreAttributes.FILENAME.key(), filename);
+ bundleAttributes.put(MERGE_COUNT_ATTRIBUTE, Integer.toString(binCopy.size()));
+ bundleAttributes.put(MERGE_BIN_AGE_ATTRIBUTE, Long.toString(unmodifiableBin.getBinAge()));
- // when MERGE_STRATEGY is Defragment and correlationAttributeName is null then bin by fragment.identifier
- if (groupId == null && MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
- groupId = flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
- }
+ bundle = session.putAllAttributes(bundle, bundleAttributes);
- final boolean binned = binManager.offer(groupId, flowFile, session);
+ final String inputDescription = (binCopy.size() < 10) ? binCopy.toString() : binCopy.size() + " FlowFiles";
+ logger.info("Merged {} into {}", new Object[]{inputDescription, bundle});
+ session.transfer(bundle, REL_MERGED);
- // could not be added to a bin -- probably too large by itself, so create a separate bin for just this guy.
- if (!binned) {
- Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
- bin.offer(flowFile, session);
- this.readyBins.add(bin);
- }
-
- binsAdded++;
- }
-
- return binsAdded;
+ // We haven't committed anything, parent will take care of it
+ return false;
}
private String getDefragmentValidationError(final List<FlowFileSessionWrapper> bin) {
@@ -578,60 +402,6 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
return NUMBER_PATTERN.matcher(value).matches();
}
- @OnScheduled
- public void onScheduled(final ProcessContext context) throws IOException {
- binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue());
-
- if (context.getProperty(MAX_BIN_AGE).isSet() ) {
- binManager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue());
- } else {
- binManager.setMaxBinAge(Integer.MAX_VALUE);
- }
-
- if ( context.getProperty(MAX_SIZE).isSet() ) {
- binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue());
- } else {
- binManager.setMaximumSize(Long.MAX_VALUE);
- }
-
- if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
- binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
- } else {
- binManager.setMinimumEntries(context.getProperty(MIN_ENTRIES).asInteger());
-
- if ( context.getProperty(MAX_ENTRIES).isSet() ) {
- binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger().intValue());
- } else {
- binManager.setMaximumEntries(Integer.MAX_VALUE);
- }
- }
-
- }
-
- @Override
- protected Collection<ValidationResult> customValidate(final ValidationContext context) {
- final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context));
-
- final long minBytes = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
- final Double maxBytes = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
-
- if (maxBytes != null && maxBytes.longValue() < minBytes) {
- problems.add(new ValidationResult.Builder().subject(MIN_SIZE.getName()).input(
- context.getProperty(MIN_SIZE).getValue()).valid(false).explanation("Min Size must be less than or equal to Max Size").build());
- }
-
- final Long min = context.getProperty(MIN_ENTRIES).asLong();
- final Long max = context.getProperty(MAX_ENTRIES).asLong();
-
- if (min != null && max != null) {
- if (min > max) {
- problems.add(new ValidationResult.Builder().subject(MIN_ENTRIES.getName()).input(context.getProperty(MIN_ENTRIES).getValue()).valid(false).explanation("Min Entries must be less than or equal to Max Entries").build());
- }
- }
-
- return problems;
- }
-
private class BinaryConcatenationMerge implements MergeBin {
private String mimeType = "application/octet-stream";
[4/6] incubator-nifi git commit: Merge branch 'NIFI-305' of
https://github.com/gresockj/incubator-nifi into develop
Posted by ma...@apache.org.
Merge branch 'NIFI-305' of https://github.com/gresockj/incubator-nifi into develop
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/c01dff59
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/c01dff59
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/c01dff59
Branch: refs/heads/develop
Commit: c01dff5922239a28d02d407fb86f27f110aceeab
Parents: bafa945 615794e
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Feb 2 13:23:36 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Feb 2 13:23:36 2015 -0500
----------------------------------------------------------------------
.../nifi/processors/standard/BinFiles.java | 405 +++++++++++++++++++
.../nifi/processors/standard/MergeContent.java | 400 ++++--------------
2 files changed, 490 insertions(+), 315 deletions(-)
----------------------------------------------------------------------
[5/6] incubator-nifi git commit: NIFI-305: Slight refactorings to
provide more flexibility in concrete implementations
Posted by ma...@apache.org.
NIFI-305: Slight refactorings to provide more flexibility in concrete implementations
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/ec7f7e77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/ec7f7e77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/ec7f7e77
Branch: refs/heads/develop
Commit: ec7f7e7717750276a6cbf56edbfcca49b1299fa5
Parents: c01dff5
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Feb 2 13:51:51 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Feb 2 13:51:51 2015 -0500
----------------------------------------------------------------------
.../nifi/processors/standard/BinFiles.java | 72 +++-----------------
.../nifi/processors/standard/MergeContent.java | 69 ++++++++++---------
2 files changed, 46 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ec7f7e77/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
index 0a65c59..3d7dba1 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
@@ -19,11 +19,8 @@ package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Queue;
-import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -39,7 +36,6 @@ import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
@@ -99,38 +95,9 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The FlowFiles that were used to create the bundle").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure").build();
- private Set<Relationship> relationships;
- private List<PropertyDescriptor> descriptors;
private final BinManager binManager = new BinManager();
-
private final Queue<Bin> readyBins = new LinkedBlockingQueue<>();
- @Override
- protected final void init(final ProcessorInitializationContext context) {
-
- final Set<Relationship> relationships = new HashSet<>();
- relationships.add(REL_ORIGINAL);
- relationships.add(REL_FAILURE);
- Set<Relationship> additionalRelationships = defineAdditionalRelationships();
- if (additionalRelationships != null) {
- relationships.addAll(additionalRelationships);
- }
- this.relationships = Collections.unmodifiableSet(relationships);
-
- final List<PropertyDescriptor> descriptors = new ArrayList<>();
- descriptors.add(MIN_ENTRIES);
- descriptors.add(MAX_ENTRIES);
- descriptors.add(MIN_SIZE);
- descriptors.add(MAX_SIZE);
- descriptors.add(MAX_BIN_AGE);
- descriptors.add(MAX_BIN_COUNT);
- List<PropertyDescriptor> additionalPropertyDescriptors = this.defineAdditionalPropertyDescriptors();
- if (additionalPropertyDescriptors != null) {
- descriptors.addAll(additionalPropertyDescriptors);
- }
-
- this.descriptors = Collections.unmodifiableList(descriptors);
- }
@OnStopped
public final void resetState() {
@@ -144,27 +111,6 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
}
}
- @Override
- public final Set<Relationship> getRelationships() {
- return relationships;
- }
-
- @Override
- protected final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return descriptors;
- }
-
- /**
- * Allows any additional relationships to be defined.
- * @return Relationships to be added in the init() method
- */
- protected abstract Set<Relationship> defineAdditionalRelationships();
-
- /**
- * Allows any additional property descriptors to be defined.
- * @return Properties to be added in the init() method
- */
- protected abstract List<PropertyDescriptor> defineAdditionalPropertyDescriptors();
/**
* Allows general pre-processing of a flow file before it is offered to a
@@ -213,14 +159,14 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
* to Failure and commit their sessions. If false, the
* processBins() method will transfer the files to Original and commit
* the sessions
- * @throws Exception
- * This will be handled appropriately, and all flow files in the
- * bin will be transferred to failure and the session rolled
- * back
+ *
+ * @throws ProcessException if any problem arises while processing a bin
+ * of FlowFiles. All flow files in the
+ * bin will be transferred to failure and the ProcessSession provided by
+ * the 'session' argument rolled back
*/
- protected abstract boolean processBin(Bin unmodifiableBin,
- List<FlowFileSessionWrapper> binContents, ProcessContext context,
- ProcessSession session, ProcessorLog logger) throws Exception;
+ protected abstract boolean processBin(Bin unmodifiableBin,
+ List<FlowFileSessionWrapper> binContents, ProcessContext context, ProcessSession session) throws ProcessException;
/**
* Allows additional custom validation to be done. This will be called from
@@ -288,8 +234,8 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
boolean binAlreadyCommitted = false;
try {
- binAlreadyCommitted = this.processBin(bin, binCopy, context, session, logger);
- } catch (final Exception e) {
+ binAlreadyCommitted = this.processBin(bin, binCopy, context, session);
+ } catch (final ProcessException e) {
logger.error("Failed to process bundle of {} files due to {}", new Object[]{binCopy.size(), e});
for (final FlowFileSessionWrapper wrapper : binCopy) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ec7f7e77/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 73cb5a6..a78bc07 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -46,10 +46,10 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
@@ -198,56 +198,62 @@ public class MergeContent extends BinFiles {
public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
+
@Override
- protected Set<Relationship> defineAdditionalRelationships() {
- final Set<Relationship> relationships = new HashSet<>();
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_ORIGINAL);
+ relationships.add(REL_FAILURE);
relationships.add(REL_MERGED);
-
return relationships;
- }
-
+ }
+
+
@Override
- protected List<PropertyDescriptor> defineAdditionalPropertyDescriptors() {
- final List<PropertyDescriptor> descriptors = new ArrayList<>();
- descriptors.add(MERGE_STRATEGY);
- descriptors.add(MERGE_FORMAT);
- descriptors.add(ATTRIBUTE_STRATEGY);
- descriptors.add(CORRELATION_ATTRIBUTE_NAME);
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(MERGE_STRATEGY);
+ descriptors.add(MERGE_FORMAT);
+ descriptors.add(ATTRIBUTE_STRATEGY);
+ descriptors.add(CORRELATION_ATTRIBUTE_NAME);
+ descriptors.add(MIN_ENTRIES);
+ descriptors.add(MAX_ENTRIES);
+ descriptors.add(MIN_SIZE);
+ descriptors.add(MAX_SIZE);
+ descriptors.add(MAX_BIN_AGE);
+ descriptors.add(MAX_BIN_COUNT);
descriptors.add(HEADER);
descriptors.add(FOOTER);
descriptors.add(DEMARCATOR);
descriptors.add(COMPRESSION_LEVEL);
descriptors.add(KEEP_PATH);
-
return descriptors;
}
-
+
private byte[] readContent(final String filename) throws IOException {
return Files.readAllBytes(Paths.get(filename));
}
@Override
- protected FlowFile preprocessFlowFile(ProcessContext context,
- ProcessSession session, FlowFile flowFile) {
-
+ protected FlowFile preprocessFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
+ FlowFile processed = flowFile;
// handle backward compatibility with old segment attributes
- if (flowFile.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) {
- flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT_ATTRIBUTE, flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE));
+ if (processed.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && processed.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) {
+ processed = session.putAttribute(processed, FRAGMENT_COUNT_ATTRIBUTE, processed.getAttribute(SEGMENT_COUNT_ATTRIBUTE));
}
- if (flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) {
- flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX_ATTRIBUTE, flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE));
+ if (processed.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null && processed.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) {
+ processed = session.putAttribute(processed, FRAGMENT_INDEX_ATTRIBUTE, processed.getAttribute(SEGMENT_INDEX_ATTRIBUTE));
}
- if (flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) {
- flowFile = session.putAttribute(flowFile, FRAGMENT_ID_ATTRIBUTE, flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE));
+ if (processed.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && processed.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) {
+ processed = session.putAttribute(processed, FRAGMENT_ID_ATTRIBUTE, processed.getAttribute(SEGMENT_ID_ATTRIBUTE));
}
- return flowFile;
+ return processed;
}
@Override
- protected String getGroupId(ProcessContext context, FlowFile flowFile) {
-
+ protected String getGroupId(final ProcessContext context, final FlowFile flowFile) {
final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
String groupId = (correlationAttributeName == null) ? null : flowFile.getAttribute(correlationAttributeName);
@@ -260,16 +266,15 @@ public class MergeContent extends BinFiles {
}
@Override
- protected void setUpBinManager(BinManager binManager, ProcessContext context) {
+ protected void setUpBinManager(final BinManager binManager, final ProcessContext context) {
if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
}
}
@Override
- protected boolean processBin(Bin unmodifiableBin,
- List<FlowFileSessionWrapper> binCopy, ProcessContext context,
- ProcessSession session, ProcessorLog logger) throws Exception {
+ protected boolean processBin(final Bin unmodifiableBin, final List<FlowFileSessionWrapper> binCopy, final ProcessContext context,
+ final ProcessSession session) throws ProcessException {
final String mergeFormat = context.getProperty(MERGE_FORMAT).getValue();
MergeBin merger;
@@ -314,7 +319,7 @@ public class MergeContent extends BinFiles {
// Fail the flow files and commit them
if (error != null) {
final String binDescription = binCopy.size() <= 10 ? binCopy.toString() : binCopy.size() + " FlowFiles";
- logger.error(error + "; routing {} to failure", new Object[]{binDescription});
+ getLogger().error(error + "; routing {} to failure", new Object[]{binDescription});
for ( final FlowFileSessionWrapper wrapper : binCopy ) {
wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE);
wrapper.getSession().commit();
@@ -341,7 +346,7 @@ public class MergeContent extends BinFiles {
bundle = session.putAllAttributes(bundle, bundleAttributes);
final String inputDescription = (binCopy.size() < 10) ? binCopy.toString() : binCopy.size() + " FlowFiles";
- logger.info("Merged {} into {}", new Object[]{inputDescription, bundle});
+ getLogger().info("Merged {} into {}", new Object[]{inputDescription, bundle});
session.transfer(bundle, REL_MERGED);
// We haven't committed anything, parent will take care of it
[2/6] incubator-nifi git commit: NIFI-305: Minor documentation update
Posted by ma...@apache.org.
NIFI-305: Minor documentation update
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/ad409034
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/ad409034
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/ad409034
Branch: refs/heads/develop
Commit: ad40903458cd2bbf98284d693a057519d8bbf775
Parents: 98afcce
Author: gresockj <jg...@gmail.com>
Authored: Tue Jan 27 17:51:16 2015 -0500
Committer: gresockj <jg...@gmail.com>
Committed: Tue Jan 27 17:51:16 2015 -0500
----------------------------------------------------------------------
.../main/java/org/apache/nifi/processors/standard/BinFiles.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ad409034/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
index 7846c7d..b838d51 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
@@ -48,7 +48,7 @@ import org.apache.nifi.processors.standard.util.BinManager;
import org.apache.nifi.processors.standard.util.FlowFileSessionWrapper;
/**
- * Base class for MergeContent.
+ * Base class for file-binning processors, including MergeContent.
*
*/
public abstract class BinFiles extends AbstractSessionFactoryProcessor {
[3/6] incubator-nifi git commit: NIFI-305: Cleaning up for
extensibility; final methods
Posted by ma...@apache.org.
NIFI-305: Cleaning up for extensibility; final methods
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/615794e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/615794e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/615794e7
Branch: refs/heads/develop
Commit: 615794e77fafc79ec6027484e584f76695a89573
Parents: ad40903
Author: gresockj <jg...@gmail.com>
Authored: Thu Jan 29 17:11:56 2015 -0500
Committer: gresockj <jg...@gmail.com>
Committed: Thu Jan 29 17:11:56 2015 -0500
----------------------------------------------------------------------
.../nifi/processors/standard/BinFiles.java | 31 +++++++++++++++-----
1 file changed, 24 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/615794e7/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
index b838d51..0a65c59 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
@@ -106,7 +106,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
private final Queue<Bin> readyBins = new LinkedBlockingQueue<>();
@Override
- protected void init(final ProcessorInitializationContext context) {
+ protected final void init(final ProcessorInitializationContext context) {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_ORIGINAL);
@@ -133,7 +133,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
}
@OnStopped
- public void resetState() {
+ public final void resetState() {
binManager.purge();
Bin bin;
@@ -145,12 +145,12 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
}
@Override
- public Set<Relationship> getRelationships() {
+ public final Set<Relationship> getRelationships() {
return relationships;
}
@Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ protected final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@@ -222,8 +222,20 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
List<FlowFileSessionWrapper> binContents, ProcessContext context,
ProcessSession session, ProcessorLog logger) throws Exception;
+ /**
+ * Allows additional custom validation to be done. This will be called from
+ * the parent's customValidation method.
+ *
+ * @param context
+ * The context
+ * @return Validation results indicating problems
+ */
+ protected Collection<ValidationResult> additionalCustomValidation(final ValidationContext context) {
+ return new ArrayList<ValidationResult>();
+ }
+
@Override
- public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
+ public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
int binsAdded = binFlowFiles(context, sessionFactory);
getLogger().debug("Binned {} FlowFiles", new Object[] {binsAdded});
@@ -336,7 +348,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
}
@OnScheduled
- public void onScheduled(final ProcessContext context) throws IOException {
+ public final void onScheduled(final ProcessContext context) throws IOException {
binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue());
if (context.getProperty(MAX_BIN_AGE).isSet() ) {
@@ -363,7 +375,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
}
@Override
- protected Collection<ValidationResult> customValidate(final ValidationContext context) {
+ protected final Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context));
final long minBytes = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
@@ -382,6 +394,11 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
problems.add(new ValidationResult.Builder().subject(MIN_ENTRIES.getName()).input(context.getProperty(MIN_ENTRIES).getValue()).valid(false).explanation("Min Entries must be less than or equal to Max Entries").build());
}
}
+
+ Collection<ValidationResult> otherProblems = this.additionalCustomValidation(context);
+ if (otherProblems != null) {
+ problems.addAll(otherProblems);
+ }
return problems;
}
[6/6] incubator-nifi git commit: Merge branch 'develop' of
http://git-wip-us.apache.org/repos/asf/incubator-nifi into develop
Posted by ma...@apache.org.
Merge branch 'develop' of http://git-wip-us.apache.org/repos/asf/incubator-nifi into develop
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/54e922c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/54e922c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/54e922c8
Branch: refs/heads/develop
Commit: 54e922c8f5740cab723d16b66d12b2a753c242ed
Parents: ec7f7e7 ed8f771
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Feb 2 15:52:26 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Feb 2 15:52:26 2015 -0500
----------------------------------------------------------------------
nifi-nar-maven-plugin/pom.xml | 2 +-
nifi/nifi-api/pom.xml | 4 +-
nifi/nifi-assembly/pom.xml | 6 +-
nifi/nifi-bootstrap/pom.xml | 2 +-
.../nifi-data-provenance-utils/pom.xml | 4 +-
.../nifi-expression-language/pom.xml | 4 +-
.../nifi-commons/nifi-flowfile-packager/pom.xml | 4 +-
nifi/nifi-commons/nifi-logging-utils/pom.xml | 4 +-
.../nifi-processor-utilities/pom.xml | 4 +-
nifi/nifi-commons/nifi-properties/pom.xml | 4 +-
nifi/nifi-commons/nifi-security-utils/pom.xml | 4 +-
nifi/nifi-commons/nifi-socket-utils/pom.xml | 4 +-
nifi/nifi-commons/nifi-utils/pom.xml | 4 +-
nifi/nifi-commons/nifi-web-utils/pom.xml | 4 +-
nifi/nifi-commons/nifi-write-ahead-log/pom.xml | 4 +-
nifi/nifi-commons/pom.xml | 5 +-
nifi/nifi-docs/pom.xml | 6 +-
.../nifi-processor-bundle-archetype/pom.xml | 2 +-
nifi/nifi-maven-archetypes/pom.xml | 2 +-
nifi/nifi-mock/pom.xml | 4 +-
.../nifi-framework-nar/pom.xml | 4 +-
.../nifi-framework/nifi-administration/pom.xml | 4 +-
.../nifi-framework/nifi-client-dto/pom.xml | 2 +-
.../nifi-cluster-authorization-provider/pom.xml | 2 +-
.../nifi-cluster-protocol/pom.xml | 2 +-
.../nifi-framework/nifi-cluster-web/pom.xml | 2 +-
.../nifi-framework/nifi-cluster/pom.xml | 2 +-
.../nifi-file-authorization-provider/pom.xml | 4 +-
.../nifi-framework-core-api/pom.xml | 4 +-
.../nifi-framework/nifi-framework-core/pom.xml | 2 +-
.../nifi-framework/nifi-nar-utils/pom.xml | 2 +-
.../nifi-framework/nifi-resources/pom.xml | 2 +-
.../src/main/resources/conf/logback.xml | 4 +-
.../nifi-framework/nifi-runtime/pom.xml | 2 +-
.../nifi-framework/nifi-security/pom.xml | 4 +-
.../nifi-framework/nifi-site-to-site/pom.xml | 4 +-
.../nifi-framework/nifi-user-actions/pom.xml | 4 +-
.../nifi-web/nifi-custom-ui-utilities/pom.xml | 2 +-
.../nifi-framework/nifi-web/nifi-jetty/pom.xml | 2 +-
.../nifi-web/nifi-web-api/pom.xml | 4 +-
.../nifi/web/controller/ControllerFacade.java | 15 +-
.../nifi-web/nifi-web-docs/pom.xml | 2 +-
.../nifi-web/nifi-web-error/pom.xml | 2 +-
.../nifi-web-optimistic-locking/pom.xml | 2 +-
.../nifi-web/nifi-web-security/pom.xml | 2 +-
.../nifi-framework/nifi-web/nifi-web-ui/pom.xml | 2 +-
.../src/main/webapp/css/provenance.css | 5 +
.../webapp/js/nf/cluster/nf-cluster-table.js | 319 ++++++-----
.../webapp/js/nf/counters/nf-counters-table.js | 63 ++-
.../webapp/js/nf/history/nf-history-table.js | 147 +++---
.../js/nf/provenance/nf-provenance-table.js | 103 ++--
.../webapp/js/nf/summary/nf-summary-table.js | 523 +++++++------------
.../src/main/webapp/js/nf/summary/nf-summary.js | 6 -
.../js/nf/templates/nf-templates-table.js | 73 ++-
.../main/webapp/js/nf/users/nf-users-table.js | 440 ++++++++--------
.../nifi-framework/nifi-web/pom.xml | 10 +-
.../nifi-framework/pom.xml | 4 +-
.../nifi-framework-bundle/pom.xml | 36 +-
.../nifi-hadoop-bundle/nifi-hadoop-nar/pom.xml | 4 +-
.../nifi-hdfs-processors/pom.xml | 2 +-
.../nifi-nar-bundles/nifi-hadoop-bundle/pom.xml | 6 +-
.../nifi-hadoop-libraries-nar/pom.xml | 4 +-
.../nifi-hadoop-libraries-bundle/pom.xml | 4 +-
nifi/nifi-nar-bundles/nifi-jetty-bundle/pom.xml | 4 +-
.../nifi-kafka-bundle/nifi-kafka-nar/pom.xml | 4 +-
.../nifi-kafka-processors/pom.xml | 2 +-
nifi/nifi-nar-bundles/nifi-kafka-bundle/pom.xml | 4 +-
.../pom.xml | 2 +-
.../nifi-provenance-repository-nar/pom.xml | 4 +-
.../nifi-volatile-provenance-repository/pom.xml | 2 +-
.../nifi-provenance-repository-bundle/pom.xml | 8 +-
.../nifi-standard-nar/pom.xml | 4 +-
.../nifi-standard-prioritizers/pom.xml | 2 +-
.../nifi-standard-processors/pom.xml | 2 +-
.../nifi-standard-reporting-tasks/pom.xml | 2 +-
.../nifi-standard-bundle/pom.xml | 10 +-
.../pom.xml | 2 +-
.../pom.xml | 2 +-
.../nifi-distributed-cache-protocol/pom.xml | 2 +-
.../nifi-distributed-cache-server/pom.xml | 2 +-
.../nifi-distributed-cache-services-nar/pom.xml | 4 +-
.../pom.xml | 4 +-
.../nifi-load-distribution-service-api/pom.xml | 4 +-
.../nifi-ssl-context-nar/pom.xml | 4 +-
.../nifi-ssl-context-service/pom.xml | 2 +-
.../nifi-ssl-context-bundle/pom.xml | 4 +-
.../nifi-ssl-context-service-api/pom.xml | 2 +-
.../nifi-standard-services-api-nar/pom.xml | 4 +-
.../nifi-standard-services/pom.xml | 4 +-
.../nifi-update-attribute-model/pom.xml | 2 +-
.../nifi-update-attribute-nar/pom.xml | 4 +-
.../nifi-update-attribute-processor/pom.xml | 2 +-
.../nifi-update-attribute-ui/pom.xml | 2 +-
.../update/attributes/api/RuleResource.java | 16 +-
.../nifi-update-attribute-bundle/pom.xml | 10 +-
nifi/nifi-nar-bundles/pom.xml | 31 +-
nifi/pom.xml | 64 +--
97 files changed, 990 insertions(+), 1140 deletions(-)
----------------------------------------------------------------------