You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by js...@apache.org on 2018/10/04 20:29:04 UTC
[08/14] nifi git commit: NIFI-5516: Implement Load-Balanced
Connections Refactoring StandardFlowFileQueue to have an
AbstractFlowFileQueue Refactored more into AbstractFlowFileQueue Added
documentation, cleaned up code some Refactored FlowFileQueue so th
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
new file mode 100644
index 0000000..f250200
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -0,0 +1,1024 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.queue.AbstractFlowFileQueue;
+import org.apache.nifi.controller.queue.ConnectionEventListener;
+import org.apache.nifi.controller.queue.DropFlowFileRequest;
+import org.apache.nifi.controller.queue.DropFlowFileState;
+import org.apache.nifi.controller.queue.FlowFileQueueContents;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
+import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
+import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
+import org.apache.nifi.controller.queue.QueueDiagnostics;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics;
+import org.apache.nifi.controller.queue.StandardQueueDiagnostics;
+import org.apache.nifi.controller.queue.SwappablePriorityQueue;
+import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry;
+import org.apache.nifi.controller.queue.clustered.partition.CorrelationAttributePartitioner;
+import org.apache.nifi.controller.queue.clustered.partition.FirstNodePartitioner;
+import org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner;
+import org.apache.nifi.controller.queue.clustered.partition.LocalPartitionPartitioner;
+import org.apache.nifi.controller.queue.clustered.partition.LocalQueuePartition;
+import org.apache.nifi.controller.queue.clustered.partition.QueuePartition;
+import org.apache.nifi.controller.queue.clustered.partition.RebalancingPartition;
+import org.apache.nifi.controller.queue.clustered.partition.RemoteQueuePartition;
+import org.apache.nifi.controller.queue.clustered.partition.RoundRobinPartitioner;
+import org.apache.nifi.controller.queue.clustered.partition.StandardRebalancingPartition;
+import org.apache.nifi.controller.queue.clustered.partition.SwappablePriorityQueueLocalPartition;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.RepositoryRecord;
+import org.apache.nifi.controller.repository.StandardRepositoryRecord;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.swap.StandardSwapSummary;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.reporting.Severity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue implements LoadBalancedFlowFileQueue {
+ private static final Logger logger = LoggerFactory.getLogger(SocketLoadBalancedFlowFileQueue.class);
+ private static final int NODE_SWAP_THRESHOLD = 1000;
+
+ private final List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
+ private final ConnectionEventListener eventListener;
+ private final AtomicReference<QueueSize> totalSize = new AtomicReference<>(new QueueSize(0, 0L));
+ private final LocalQueuePartition localPartition;
+ private final RebalancingPartition rebalancingPartition;
+ private final FlowFileSwapManager swapManager;
+ private final EventReporter eventReporter;
+ private final ClusterCoordinator clusterCoordinator;
+ private final AsyncLoadBalanceClientRegistry clientRegistry;
+
+ private final FlowFileRepository flowFileRepo;
+ private final ProvenanceEventRepository provRepo;
+ private final ContentRepository contentRepo;
+ private final Set<NodeIdentifier> nodeIdentifiers;
+
+ private final ReadWriteLock partitionLock = new ReentrantReadWriteLock();
+ private final Lock partitionReadLock = partitionLock.readLock();
+ private final Lock partitionWriteLock = partitionLock.writeLock();
+ private QueuePartition[] queuePartitions;
+ private FlowFilePartitioner partitioner;
+ private boolean stopped = true;
+
+
+ public SocketLoadBalancedFlowFileQueue(final String identifier, final ConnectionEventListener eventListener, final ProcessScheduler scheduler, final FlowFileRepository flowFileRepo,
+ final ProvenanceEventRepository provRepo, final ContentRepository contentRepo, final ResourceClaimManager resourceClaimManager,
+ final ClusterCoordinator clusterCoordinator, final AsyncLoadBalanceClientRegistry clientRegistry, final FlowFileSwapManager swapManager,
+ final int swapThreshold, final EventReporter eventReporter) {
+
+ super(identifier, scheduler, flowFileRepo, provRepo, resourceClaimManager);
+ this.eventListener = eventListener;
+ this.eventReporter = eventReporter;
+ this.swapManager = swapManager;
+ this.flowFileRepo = flowFileRepo;
+ this.provRepo = provRepo;
+ this.contentRepo = contentRepo;
+ this.clusterCoordinator = clusterCoordinator;
+ this.clientRegistry = clientRegistry;
+
+ localPartition = new SwappablePriorityQueueLocalPartition(swapManager, swapThreshold, eventReporter, this, this::drop);
+ rebalancingPartition = new StandardRebalancingPartition(swapManager, swapThreshold, eventReporter, this, this::drop);
+
+ // Create a RemoteQueuePartition for each node
+ nodeIdentifiers = clusterCoordinator == null ? Collections.emptySet() : clusterCoordinator.getNodeIdentifiers();
+
+ final List<NodeIdentifier> sortedNodeIdentifiers = new ArrayList<>(nodeIdentifiers);
+ sortedNodeIdentifiers.sort(Comparator.comparing(NodeIdentifier::getApiAddress));
+
+ if (sortedNodeIdentifiers.isEmpty()) {
+ queuePartitions = new QueuePartition[] { localPartition };
+ } else {
+ queuePartitions = new QueuePartition[sortedNodeIdentifiers.size()];
+
+ for (int i = 0; i < sortedNodeIdentifiers.size(); i++) {
+ final NodeIdentifier nodeId = sortedNodeIdentifiers.get(i);
+ if (nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) {
+ queuePartitions[i] = localPartition;
+ } else {
+ queuePartitions[i] = createRemotePartition(nodeId);
+ }
+ }
+
+ }
+
+ partitioner = new LocalPartitionPartitioner();
+
+ if (clusterCoordinator != null) {
+ clusterCoordinator.registerEventListener(new ClusterEventListener());
+ }
+
+ rebalancingPartition.start(partitioner);
+ }
+
+
+ @Override
+ public synchronized void setLoadBalanceStrategy(final LoadBalanceStrategy strategy, final String partitioningAttribute) {
+ final LoadBalanceStrategy currentStrategy = getLoadBalanceStrategy();
+ final String currentPartitioningAttribute = getPartitioningAttribute();
+
+ super.setLoadBalanceStrategy(strategy, partitioningAttribute);
+
+ if (strategy == currentStrategy && Objects.equals(partitioningAttribute, currentPartitioningAttribute)) {
+ // Nothing changed.
+ return;
+ }
+
+ if (clusterCoordinator == null) {
+ // Not clustered so nothing to worry about.
+ return;
+ }
+
+ // We are already load balancing but are changing how we are load balancing.
+ final FlowFilePartitioner partitioner;
+ switch (strategy) {
+ case DO_NOT_LOAD_BALANCE:
+ partitioner = new LocalPartitionPartitioner();
+ break;
+ case PARTITION_BY_ATTRIBUTE:
+ partitioner = new CorrelationAttributePartitioner(partitioningAttribute);
+ break;
+ case ROUND_ROBIN:
+ partitioner = new RoundRobinPartitioner();
+ break;
+ case SINGLE_NODE:
+ partitioner = new FirstNodePartitioner();
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+
+ setFlowFilePartitioner(partitioner);
+ }
+
+ public synchronized void startLoadBalancing() {
+ logger.debug("{} started. Will begin distributing FlowFiles across the cluster", this);
+
+ if (!stopped) {
+ return;
+ }
+
+ stopped = false;
+
+ partitionReadLock.lock();
+ try {
+ rebalancingPartition.start(partitioner);
+
+ for (final QueuePartition queuePartition : queuePartitions) {
+ queuePartition.start(partitioner);
+ }
+ } finally {
+ partitionReadLock.unlock();
+ }
+ }
+
+ public synchronized void stopLoadBalancing() {
+ logger.debug("{} stopped. Will no longer distribute FlowFiles across the cluster", this);
+
+ if (stopped) {
+ return;
+ }
+
+ stopped = true;
+
+ partitionReadLock.lock();
+ try {
+ rebalancingPartition.stop();
+ for (final QueuePartition queuePartition : queuePartitions) {
+ queuePartition.stop();
+ }
+ } finally {
+ partitionReadLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean isActivelyLoadBalancing() {
+ final QueueSize size = size();
+ if (size.getObjectCount() == 0) {
+ return false;
+ }
+
+ final int localObjectCount = localPartition.size().getObjectCount();
+ return (size.getObjectCount() > localObjectCount);
+ }
+
+ private QueuePartition createRemotePartition(final NodeIdentifier nodeId) {
+ final SwappablePriorityQueue partitionQueue = new SwappablePriorityQueue(swapManager, NODE_SWAP_THRESHOLD, eventReporter, this, this::drop, nodeId.getId());
+
+ final TransferFailureDestination failureDestination = new TransferFailureDestination() {
+ @Override
+ public void putAll(final Collection<FlowFileRecord> flowFiles, final FlowFilePartitioner partitionerUsed) {
+ if (flowFiles.isEmpty()) {
+ return;
+ }
+
+ partitionReadLock.lock();
+ try {
+ if (isRebalanceOnFailure(partitionerUsed)) {
+ logger.debug("Transferring {} FlowFiles to Rebalancing Partition from node {}", flowFiles.size(), nodeId);
+ rebalancingPartition.rebalance(flowFiles);
+ } else {
+ logger.debug("Returning {} FlowFiles to their queue for node {} because Partitioner {} indicates that the FlowFiles should stay where they are", flowFiles.size(), nodeId,
+ partitioner);
+ partitionQueue.putAll(flowFiles);
+ }
+ } finally {
+ partitionReadLock.unlock();
+ }
+ }
+
+ @Override
+ public void putAll(final Function<String, FlowFileQueueContents> queueContentsFunction, final FlowFilePartitioner partitionerUsed) {
+ partitionReadLock.lock();
+ try {
+ if (isRebalanceOnFailure(partitionerUsed)) {
+ final FlowFileQueueContents contents = queueContentsFunction.apply(rebalancingPartition.getSwapPartitionName());
+ rebalancingPartition.rebalance(contents);
+ logger.debug("Transferring all {} FlowFiles and {} Swap Files queued for node {} to Rebalancing Partition",
+ contents.getActiveFlowFiles().size(), contents.getSwapLocations().size(), nodeId);
+ } else {
+ logger.debug("Will not transfer FlowFiles queued for node {} to Rebalancing Partition because Partitioner {} indicates that the FlowFiles should stay where they are", nodeId,
+ partitioner);
+ }
+ } finally {
+ partitionReadLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean isRebalanceOnFailure(final FlowFilePartitioner partitionerUsed) {
+ partitionReadLock.lock();
+ try {
+ if (!partitionerUsed.equals(partitioner)) {
+ return true;
+ }
+
+ return partitioner.isRebalanceOnFailure();
+ } finally {
+ partitionReadLock.unlock();
+ }
+ }
+ };
+
+ final QueuePartition partition = new RemoteQueuePartition(nodeId, partitionQueue, failureDestination, flowFileRepo, provRepo, contentRepo, clientRegistry, this);
+
+ if (!stopped) {
+ partition.start(partitioner);
+ }
+
+ return partition;
+ }
+
+ @Override
+ public synchronized List<FlowFilePrioritizer> getPriorities() {
+ return new ArrayList<>(prioritizers);
+ }
+
+ @Override
+ public synchronized void setPriorities(final List<FlowFilePrioritizer> newPriorities) {
+ prioritizers.clear();
+ prioritizers.addAll(newPriorities);
+
+ partitionReadLock.lock();
+ try {
+ for (final QueuePartition partition : queuePartitions) {
+ partition.setPriorities(newPriorities);
+ }
+
+ rebalancingPartition.setPriorities(newPriorities);
+ } finally {
+ partitionReadLock.unlock();
+ }
+ }
+
+
+ @Override
+ public SwapSummary recoverSwappedFlowFiles() {
+ partitionReadLock.lock();
+ try {
+ final List<SwapSummary> summaries = new ArrayList<>(queuePartitions.length);
+
+ // Discover the names of all partitions that have data swapped out.
+ Set<String> partitionNamesToRecover;
+ try {
+ partitionNamesToRecover = swapManager.getSwappedPartitionNames(this);
+ logger.debug("For {}, partition names to recover are {}", this, partitionNamesToRecover);
+ } catch (final IOException ioe) {
+ logger.error("Failed to determine the names of the Partitions that have swapped FlowFiles for queue with ID {}.", getIdentifier(), ioe);
+ if (eventReporter != null) {
+ eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to determine the names of Partitions that have swapped FlowFiles for queue with ID " +
+ getIdentifier() + "; see logs for more detials");
+ }
+
+ partitionNamesToRecover = Collections.emptySet();
+ }
+
+ // For each Queue Partition, recover swapped FlowFiles.
+ for (final QueuePartition partition : queuePartitions) {
+ partitionNamesToRecover.remove(partition.getSwapPartitionName());
+
+ final SwapSummary summary = partition.recoverSwappedFlowFiles();
+ summaries.add(summary);
+ }
+
+ // Recover any swap files that may belong to the 'rebalancing' partition
+ partitionNamesToRecover.remove(rebalancingPartition.getSwapPartitionName());
+ final SwapSummary rebalancingSwapSummary = rebalancingPartition.recoverSwappedFlowFiles();
+ summaries.add(rebalancingSwapSummary);
+
+ // If there is any Partition that has swapped FlowFiles but for which we don't have a Queue Partition created, we need to recover those swap locations
+ // and get their swap summaries now. We then transfer any Swap Files that existed for that partition to the 'rebalancing' partition so that the data
+ // will be rebalanced against the existing partitions. We do this to handle the following scenario:
+ // - NiFi is running in a cluster with 5 nodes.
+ // - A queue is load balanced across the cluster, with all partitions having data swapped out.
+ // - NiFi is shutdown and upgraded to a new version.
+ // - Admin failed to copy over the Managed State for the nodes from the old version to the new version.
+ // - Upon restart, NiFi does not know about any of the nodes in the cluster.
+ // - When a node joins and recovers swap locations, it is the only known node.
+ // - NiFi will not know that it needs a Remote Partition for nodes 2-5.
+ // - If we don't recover those partitions here, then we'll end up not recovering the Swap Files at all, which will result in the Content Claims
+ // have their Claimant Counts decremented, which could lead to loss of the data from the Content Repository.
+ for (final String partitionName : partitionNamesToRecover) {
+ logger.info("Found Swap Files for FlowFile Queue with Identifier {} and Partition {} that has not been recovered yet. "
+ + "Will recover Swap Files for this Partition even though no partition exists with this name yet", getIdentifier(), partitionName);
+
+ try {
+ final List<String> swapLocations = swapManager.recoverSwapLocations(this, partitionName);
+ for (final String swapLocation : swapLocations) {
+ final SwapSummary swapSummary = swapManager.getSwapSummary(swapLocation);
+ summaries.add(swapSummary);
+
+ // Transfer the swap file to the rebalancing partition.
+ final String updatedSwapLocation = swapManager.changePartitionName(swapLocation, rebalancingPartition.getSwapPartitionName());
+ final FlowFileQueueContents queueContents = new FlowFileQueueContents(Collections.emptyList(), Collections.singletonList(updatedSwapLocation), swapSummary.getQueueSize());
+ rebalancingPartition.rebalance(queueContents);
+ }
+ } catch (IOException e) {
+ logger.error("Failed to determine whether or not any Swap Files exist for FlowFile Queue {} and Partition {}", getIdentifier(), partitionName, e);
+ if (eventReporter != null) {
+ eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to determine whether or not any Swap Files exist for FlowFile Queue " +
+ getIdentifier() + "; see logs for more detials");
+ }
+ }
+ }
+
+ Long maxId = null;
+ QueueSize totalQueueSize = new QueueSize(0, 0L);
+ final List<ResourceClaim> resourceClaims = new ArrayList<>();
+
+ for (final SwapSummary summary : summaries) {
+ Long summaryMaxId = summary.getMaxFlowFileId();
+ if (summaryMaxId != null && (maxId == null || summaryMaxId > maxId)) {
+ maxId = summaryMaxId;
+ }
+
+ final QueueSize summaryQueueSize = summary.getQueueSize();
+ totalQueueSize = totalQueueSize.add(summaryQueueSize);
+
+ final List<ResourceClaim> summaryResourceClaims = summary.getResourceClaims();
+ resourceClaims.addAll(summaryResourceClaims);
+ }
+
+ adjustSize(totalQueueSize.getObjectCount(), totalQueueSize.getByteCount());
+
+ return new StandardSwapSummary(totalQueueSize, maxId, resourceClaims);
+ } finally {
+ partitionReadLock.unlock();
+ }
+ }
+
+ @Override
+ public void purgeSwapFiles() {
+ swapManager.purge();
+ }
+
+ @Override
+ public QueueSize size() {
+ return totalSize.get();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return size().getObjectCount() == 0;
+ }
+
+ @Override
+ public boolean isActiveQueueEmpty() {
+ return localPartition.isActiveQueueEmpty();
+ }
+
+ @Override
+ public QueueDiagnostics getQueueDiagnostics() {
+ partitionReadLock.lock();
+ try {
+ final LocalQueuePartitionDiagnostics localDiagnostics = localPartition.getQueueDiagnostics();
+
+ final List<RemoteQueuePartitionDiagnostics> remoteDiagnostics = new ArrayList<>(queuePartitions.length - 1);
+
+ for (final QueuePartition partition : queuePartitions) {
+ if (partition instanceof RemoteQueuePartition) {
+ final RemoteQueuePartition queuePartition = (RemoteQueuePartition) partition;
+ final RemoteQueuePartitionDiagnostics diagnostics = queuePartition.getDiagnostics();
+ remoteDiagnostics.add(diagnostics);
+ }
+ }
+
+ return new StandardQueueDiagnostics(localDiagnostics, remoteDiagnostics);
+ } finally {
+ partitionReadLock.unlock();
+ }
+ }
+
+ protected LocalQueuePartition getLocalPartition() {
+ return localPartition;
+ }
+
+ protected int getPartitionCount() {
+ partitionReadLock.lock();
+ try {
+ return queuePartitions.length;
+ } finally {
+ partitionReadLock.unlock();
+ }
+ }
+
+ protected QueuePartition getPartition(final int index) {
+ partitionReadLock.lock();
+ try {
+ if (index < 0 || index >= queuePartitions.length) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ return queuePartitions[index];
+ } finally {
+ partitionReadLock.unlock();
+ }
+ }
+
+ private void adjustSize(final int countToAdd, final long bytesToAdd) {
+ boolean updated = false;
+ while (!updated) {
+ final QueueSize queueSize = this.totalSize.get();
+ final QueueSize updatedSize = queueSize.add(countToAdd, bytesToAdd);
+ updated = totalSize.compareAndSet(queueSize, updatedSize);
+ }
+ }
+
+ public void onTransfer(final Collection<FlowFileRecord> flowFiles) {
+ adjustSize(-flowFiles.size(), -flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum());
+ }
+
+ public void onAbort(final Collection<FlowFileRecord> flowFiles) {
+ if (flowFiles == null || flowFiles.isEmpty()) {
+ return;
+ }
+
+ adjustSize(-flowFiles.size(), -flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum());
+ }
+
+ /**
+ * Determines which QueuePartition the given FlowFile belongs to. Must be called with partition read lock held.
+ *
+ * @param flowFile the FlowFile
+ * @return the QueuePartition that the FlowFile belongs to
+ */
+ private QueuePartition getPartition(final FlowFileRecord flowFile) {
+ final QueuePartition queuePartition = partitioner.getPartition(flowFile, queuePartitions, localPartition);
+ logger.debug("{} Assigning {} to Partition: {}", this, flowFile, queuePartition);
+ return queuePartition;
+ }
+
+ public void setNodeIdentifiers(final Set<NodeIdentifier> updatedNodeIdentifiers, final boolean forceUpdate) {
+ partitionWriteLock.lock();
+ try {
+ // If nothing is changing, then just return
+ if (!forceUpdate && this.nodeIdentifiers.equals(updatedNodeIdentifiers)) {
+ logger.debug("{} Not going to rebalance Queue even though setNodeIdentifiers was called, because the new set of Node Identifiers is the same as the existing set", this);
+ return;
+ }
+
+ logger.debug("{} Stopping the {} queue partitions in order to change node identifiers from {} to {}", this, queuePartitions.length, this.nodeIdentifiers, updatedNodeIdentifiers);
+ for (final QueuePartition queuePartition : queuePartitions) {
+ queuePartition.stop();
+ }
+
+ // Determine which Node Identifiers, if any, were removed.
+ final Set<NodeIdentifier> removedNodeIds = new HashSet<>(this.nodeIdentifiers);
+ removedNodeIds.removeAll(updatedNodeIdentifiers);
+ logger.debug("{} The following Node Identifiers were removed from the cluster: {}", this, removedNodeIds);
+
+ // Build up a Map of Node ID to Queue Partition so that we can easily pull over the existing
+ // QueuePartition objects instead of having to create new ones.
+ final Map<NodeIdentifier, QueuePartition> partitionMap = new HashMap<>();
+ for (final QueuePartition partition : this.queuePartitions) {
+ final Optional<NodeIdentifier> nodeIdOption = partition.getNodeIdentifier();
+ nodeIdOption.ifPresent(nodeIdentifier -> partitionMap.put(nodeIdentifier, partition));
+ }
+
+ // Re-define 'queuePartitions' array
+ final List<NodeIdentifier> sortedNodeIdentifiers = new ArrayList<>(updatedNodeIdentifiers);
+ sortedNodeIdentifiers.sort(Comparator.comparing(NodeIdentifier::getApiAddress));
+
+ final QueuePartition[] updatedQueuePartitions;
+ if (sortedNodeIdentifiers.isEmpty()) {
+ updatedQueuePartitions = new QueuePartition[] { localPartition };
+ } else {
+ updatedQueuePartitions = new QueuePartition[sortedNodeIdentifiers.size()];
+ }
+
+ // Populate the new QueuePartitions.
+ for (int i = 0; i < sortedNodeIdentifiers.size(); i++) {
+ final NodeIdentifier nodeId = sortedNodeIdentifiers.get(i);
+ if (nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) {
+ updatedQueuePartitions[i] = localPartition;
+
+ // If we have RemoteQueuePartition with this Node ID with data, that data must be migrated to the local partition.
+ // This can happen if we didn't previously know our Node UUID.
+ final QueuePartition existingPartition = partitionMap.get(nodeId);
+ if (existingPartition != null && existingPartition != localPartition) {
+ final FlowFileQueueContents partitionContents = existingPartition.packageForRebalance(localPartition.getSwapPartitionName());
+ logger.debug("Transferred data from {} to {}", existingPartition, localPartition);
+ localPartition.inheritQueueContents(partitionContents);
+ }
+
+ continue;
+ }
+
+ final QueuePartition existingPartition = partitionMap.get(nodeId);
+ updatedQueuePartitions[i] = existingPartition == null ? createRemotePartition(nodeId) : existingPartition;
+ }
+
+ // If the partition requires that all partitions be re-balanced when the number of partitions changes, then do so.
+ // Otherwise, just rebalance the data from any Partitions that were removed, if any.
+ if (partitioner.isRebalanceOnClusterResize()) {
+ for (final QueuePartition queuePartition : this.queuePartitions) {
+ logger.debug("Rebalancing {}", queuePartition);
+ rebalance(queuePartition);
+ }
+ } else {
+ // Not all partitions need to be rebalanced, so just ensure that we rebalance any FlowFiles that are destined
+ // for a node that is no longer in the cluster.
+ for (final NodeIdentifier removedNodeId : removedNodeIds) {
+ final QueuePartition removedPartition = partitionMap.get(removedNodeId);
+ if (removedPartition == null) {
+ continue;
+ }
+
+ logger.debug("Rebalancing {}", removedPartition);
+ rebalance(removedPartition);
+ }
+ }
+
+ // Unregister any client for which the node was removed from the cluster
+ for (final NodeIdentifier removedNodeId : removedNodeIds) {
+ final QueuePartition removedPartition = partitionMap.get(removedNodeId);
+ if (removedPartition instanceof RemoteQueuePartition) {
+ ((RemoteQueuePartition) removedPartition).onRemoved();
+ }
+ }
+
+
+ this.nodeIdentifiers.clear();
+ this.nodeIdentifiers.addAll(updatedNodeIdentifiers);
+
+ this.queuePartitions = updatedQueuePartitions;
+
+ logger.debug("{} Restarting the {} queue partitions now that node identifiers have been updated", this, queuePartitions.length);
+ if (!stopped) {
+ for (final QueuePartition queuePartition : updatedQueuePartitions) {
+ queuePartition.start(partitioner);
+ }
+ }
+ } finally {
+ partitionWriteLock.unlock();
+ }
+ }
+
+ protected void rebalance(final QueuePartition partition) {
+ final FlowFileQueueContents contents = partition.packageForRebalance(rebalancingPartition.getSwapPartitionName());
+ rebalancingPartition.rebalance(contents);
+ }
+
+ @Override
+ public void put(final FlowFileRecord flowFile) {
+ putAndGetPartition(flowFile);
+ }
+
+ protected QueuePartition putAndGetPartition(final FlowFileRecord flowFile) {
+ final QueuePartition partition;
+
+ partitionReadLock.lock();
+ try {
+ adjustSize(1, flowFile.getSize());
+
+ partition = getPartition(flowFile);
+ partition.put(flowFile);
+ } finally {
+ partitionReadLock.unlock();
+ }
+
+ eventListener.triggerDestinationEvent();
+ return partition;
+ }
+
+ public void receiveFromPeer(final Collection<FlowFileRecord> flowFiles) {
+ partitionReadLock.lock();
+ try {
+ if (partitioner.isRebalanceOnClusterResize()) {
+ logger.debug("Received the following FlowFiles from Peer: {}. Will re-partition FlowFiles to ensure proper balancing across the cluster.", flowFiles);
+ putAll(flowFiles);
+ } else {
+ logger.debug("Received the following FlowFiles from Peer: {}. Will accept FlowFiles to the local partition", flowFiles);
+ localPartition.putAll(flowFiles);
+ adjustSize(flowFiles.size(), flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum());
+ }
+ } finally {
+ partitionReadLock.unlock();
+ }
+ }
+
+ @Override
+ public void putAll(final Collection<FlowFileRecord> flowFiles) {
+ putAllAndGetPartitions(flowFiles);
+ }
+
+ protected Map<QueuePartition, List<FlowFileRecord>> putAllAndGetPartitions(final Collection<FlowFileRecord> flowFiles) {
+ partitionReadLock.lock();
+ try {
+ // NOTE WELL: It is imperative that we adjust the size of the queue here before distributing FlowFiles to partitions.
+ // If we do it the other way around, we could encounter a race condition where we distribute a FlowFile to the Local Partition,
+ // but have not yet adjusted the size. The processor consuming from this queue could then poll() the FlowFile, and acknowledge it.
+ // If that happens before we adjust the size, then we can end up with a negative Queue Size, which will throw an IllegalArgumentException,
+ // and we end up with the wrong Queue Size.
+ final long bytes = flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum();
+ adjustSize(flowFiles.size(), bytes);
+
+ final Map<QueuePartition, List<FlowFileRecord>> partitionMap = distributeToPartitionsAndGet(flowFiles);
+
+ return partitionMap;
+ } finally {
+ partitionReadLock.unlock();
+
+ eventListener.triggerDestinationEvent();
+ }
+ }
+
+ @Override
+ public void distributeToPartitions(final Collection<FlowFileRecord> flowFiles) {
+ distributeToPartitionsAndGet(flowFiles);
+ }
+
+ public Map<QueuePartition, List<FlowFileRecord>> distributeToPartitionsAndGet(final Collection<FlowFileRecord> flowFiles) {
+ if (flowFiles == null || flowFiles.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ final Map<QueuePartition, List<FlowFileRecord>> partitionMap;
+
+ partitionReadLock.lock();
+ try {
+ // Optimize for the most common case (no load balancing) so that we will just call getPartition() for the first FlowFile
+ // in the Collection and then put all FlowFiles into that QueuePartition. Is fairly expensive to call stream().collect(#groupingBy).
+ if (partitioner.isPartitionStatic()) {
+ final QueuePartition partition = getPartition(flowFiles.iterator().next());
+ partition.putAll(flowFiles);
+
+ final List<FlowFileRecord> flowFileList = (flowFiles instanceof List) ? (List<FlowFileRecord>) flowFiles : new ArrayList<>(flowFiles);
+ partitionMap = Collections.singletonMap(partition, flowFileList);
+
+ logger.debug("Partitioner is static so Partitioned FlowFiles as: {}", partitionMap);
+ return partitionMap;
+ }
+
+ partitionMap = flowFiles.stream().collect(Collectors.groupingBy(this::getPartition));
+ logger.debug("Partitioned FlowFiles as: {}", partitionMap);
+
+ for (final Map.Entry<QueuePartition, List<FlowFileRecord>> entry : partitionMap.entrySet()) {
+ final QueuePartition partition = entry.getKey();
+ final List<FlowFileRecord> flowFilesForPartition = entry.getValue();
+
+ partition.putAll(flowFilesForPartition);
+ }
+ } finally {
+ partitionReadLock.unlock();
+ }
+
+ return partitionMap;
+ }
+
+ protected void setFlowFilePartitioner(final FlowFilePartitioner partitioner) {
+ partitionWriteLock.lock();
+ try {
+ if (this.partitioner.equals(partitioner)) {
+ return;
+ }
+
+ this.partitioner = partitioner;
+
+ for (final QueuePartition partition : this.queuePartitions) {
+ rebalance(partition);
+ }
+ } finally {
+ partitionWriteLock.unlock();
+ }
+ }
+
+ @Override
+ public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
+ final FlowFileRecord flowFile = localPartition.poll(expiredRecords);
+ onAbort(expiredRecords);
+ return flowFile;
+ }
+
+ @Override
+ public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords) {
+ final List<FlowFileRecord> flowFiles = localPartition.poll(maxResults, expiredRecords);
+ onAbort(expiredRecords);
+ return flowFiles;
+ }
+
+ @Override
+ public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords) {
+ final List<FlowFileRecord> flowFiles = localPartition.poll(filter, expiredRecords);
+ onAbort(expiredRecords);
+ return flowFiles;
+ }
+
+ @Override
+ public void acknowledge(final FlowFileRecord flowFile) {
+ localPartition.acknowledge(flowFile);
+
+ adjustSize(-1, -flowFile.getSize());
+ eventListener.triggerSourceEvent();
+ }
+
+ @Override
+ public void acknowledge(final Collection<FlowFileRecord> flowFiles) {
+ localPartition.acknowledge(flowFiles);
+
+ if (!flowFiles.isEmpty()) {
+ final long bytes = flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum();
+ adjustSize(-flowFiles.size(), -bytes);
+ }
+
+ eventListener.triggerSourceEvent();
+ }
+
+ @Override
+ public boolean isUnacknowledgedFlowFile() {
+ return localPartition.isUnacknowledgedFlowFile();
+ }
+
+ @Override
+ public FlowFileRecord getFlowFile(final String flowFileUuid) throws IOException {
+ return localPartition.getFlowFile(flowFileUuid);
+ }
+
+ @Override
+ public boolean isPropagateBackpressureAcrossNodes() {
+ // TODO: We will want to modify this when we have the ability to offload flowfiles from a node.
+ return true;
+ }
+
+ @Override
+ public void handleExpiredRecords(final Collection<FlowFileRecord> expired) {
+ if (expired == null || expired.isEmpty()) {
+ return;
+ }
+
+ logger.info("{} {} FlowFiles have expired and will be removed", new Object[] {this, expired.size()});
+ final List<RepositoryRecord> expiredRecords = new ArrayList<>(expired.size());
+ final List<ProvenanceEventRecord> provenanceEvents = new ArrayList<>(expired.size());
+
+ for (final FlowFileRecord flowFile : expired) {
+ final StandardRepositoryRecord record = new StandardRepositoryRecord(this, flowFile);
+ record.markForDelete();
+ expiredRecords.add(record);
+
+ final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder()
+ .fromFlowFile(flowFile)
+ .setEventType(ProvenanceEventType.EXPIRE)
+ .setDetails("Expiration Threshold = " + getFlowFileExpiration())
+ .setComponentType("Load-Balanced Connection")
+ .setComponentId(getIdentifier())
+ .setEventTime(System.currentTimeMillis());
+
+ final ContentClaim contentClaim = flowFile.getContentClaim();
+ if (contentClaim != null) {
+ final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+ builder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(),
+ contentClaim.getOffset() + flowFile.getContentClaimOffset(), flowFile.getSize());
+
+ builder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(),
+ contentClaim.getOffset() + flowFile.getContentClaimOffset(), flowFile.getSize());
+ }
+
+ final ProvenanceEventRecord provenanceEvent = builder.build();
+ provenanceEvents.add(provenanceEvent);
+
+ final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
+ logger.info("{} terminated due to FlowFile expiration; life of FlowFile = {} ms", new Object[] {flowFile, flowFileLife});
+ }
+
+ try {
+ flowFileRepo.updateRepository(expiredRecords);
+
+ for (final RepositoryRecord expiredRecord : expiredRecords) {
+ contentRepo.decrementClaimantCount(expiredRecord.getCurrentClaim());
+ }
+
+ provRepo.registerEvents(provenanceEvents);
+
+ adjustSize(-expired.size(), -expired.stream().mapToLong(FlowFileRecord::getSize).sum());
+ } catch (IOException e) {
+ logger.warn("Encountered {} expired FlowFiles but failed to update FlowFile Repository. This FlowFiles may re-appear in the queue after NiFi is restarted and will be expired again at " +
+ "that point.", expiredRecords.size(), e);
+ }
+ }
+
+
+ @Override
+ protected List<FlowFileRecord> getListableFlowFiles() {
+ return localPartition.getListableFlowFiles();
+ }
+
+ @Override
+ protected void dropFlowFiles(final DropFlowFileRequest dropRequest, final String requestor) {
+ partitionReadLock.lock();
+ try {
+ dropRequest.setOriginalSize(size());
+ dropRequest.setState(DropFlowFileState.DROPPING_FLOWFILES);
+
+ int droppedCount = 0;
+ long droppedBytes = 0;
+
+ try {
+ for (final QueuePartition partition : queuePartitions) {
+ final DropFlowFileRequest partitionRequest = new DropFlowFileRequest(dropRequest.getRequestIdentifier() + "-" + localPartition.getNodeIdentifier());
+
+ partition.dropFlowFiles(partitionRequest, requestor);
+
+ adjustSize(-partitionRequest.getDroppedSize().getObjectCount(), -partitionRequest.getDroppedSize().getByteCount());
+ dropRequest.setDroppedSize(new QueueSize(dropRequest.getDroppedSize().getObjectCount() + partitionRequest.getDroppedSize().getObjectCount(),
+ dropRequest.getDroppedSize().getByteCount() + partitionRequest.getDroppedSize().getByteCount()));
+
+ droppedCount += partitionRequest.getDroppedSize().getObjectCount();
+ droppedBytes += partitionRequest.getDroppedSize().getByteCount();
+
+ dropRequest.setDroppedSize(new QueueSize(droppedCount, droppedBytes));
+ dropRequest.setCurrentSize(size());
+
+ if (partitionRequest.getState() == DropFlowFileState.CANCELED) {
+ dropRequest.cancel();
+ break;
+ } else if (partitionRequest.getState() == DropFlowFileState.FAILURE) {
+ dropRequest.setState(DropFlowFileState.FAILURE, partitionRequest.getFailureReason());
+ break;
+ }
+ }
+
+ if (dropRequest.getState() == DropFlowFileState.DROPPING_FLOWFILES) {
+ dropRequest.setState(DropFlowFileState.COMPLETE);
+ }
+ } catch (final Exception e) {
+ logger.error("Failed to drop FlowFiles for {}", this, e);
+ dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + e.getMessage() + ". See log for more details.");
+ }
+ } finally {
+ partitionReadLock.unlock();
+ }
+ }
+
+ @Override
+ public void lock() {
+ partitionReadLock.lock();
+ }
+
+ @Override
+ public void unlock() {
+ partitionReadLock.unlock();
+ }
+
+ private class ClusterEventListener implements ClusterTopologyEventListener {
+ @Override
+ public void onNodeAdded(final NodeIdentifier nodeId) {
+ partitionWriteLock.lock();
+ try {
+ final Set<NodeIdentifier> updatedNodeIds = new HashSet<>(nodeIdentifiers);
+ updatedNodeIds.add(nodeId);
+
+ logger.debug("Node Identifier {} added to cluster. Node ID's changing from {} to {}", nodeId, nodeIdentifiers, updatedNodeIds);
+ setNodeIdentifiers(updatedNodeIds, false);
+ } finally {
+ partitionWriteLock.unlock();
+ }
+ }
+
+ @Override
+ public void onNodeRemoved(final NodeIdentifier nodeId) {
+ partitionWriteLock.lock();
+ try {
+ final Set<NodeIdentifier> updatedNodeIds = new HashSet<>(nodeIdentifiers);
+ updatedNodeIds.remove(nodeId);
+
+ logger.debug("Node Identifier {} removed from cluster. Node ID's changing from {} to {}", nodeId, nodeIdentifiers, updatedNodeIds);
+ setNodeIdentifiers(updatedNodeIds, false);
+ } finally {
+ partitionWriteLock.unlock();
+ }
+ }
+
+ @Override
+ public void onLocalNodeIdentifierSet(final NodeIdentifier localNodeId) {
+ partitionWriteLock.lock();
+ try {
+ if (localNodeId == null) {
+ return;
+ }
+
+ logger.debug("Local Node Identifier set to {}; current partitions = {}", localNodeId, queuePartitions);
+
+ for (final QueuePartition partition : queuePartitions) {
+ final Optional<NodeIdentifier> nodeIdentifierOption = partition.getNodeIdentifier();
+ if (!nodeIdentifierOption.isPresent()) {
+ continue;
+ }
+
+ final NodeIdentifier nodeIdentifier = nodeIdentifierOption.get();
+ if (nodeIdentifier.equals(localNodeId)) {
+ if (partition instanceof LocalQueuePartition) {
+ logger.debug("{} Local Node Identifier set to {} and QueuePartition with this identifier is already a Local Queue Partition", SocketLoadBalancedFlowFileQueue.this,
+ localNodeId);
+ break;
+ }
+
+ logger.debug("{} Local Node Identifier set to {} and found Queue Partition {} with that Node Identifier. Will force update of partitions",
+ SocketLoadBalancedFlowFileQueue.this, localNodeId, partition);
+
+ setNodeIdentifiers(SocketLoadBalancedFlowFileQueue.this.nodeIdentifiers, true);
+ return;
+ }
+ }
+
+ logger.debug("{} Local Node Identifier set to {} but found no Queue Partition with that Node Identifier.", SocketLoadBalancedFlowFileQueue.this, localNodeId);
+ } finally {
+ partitionWriteLock.unlock();
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/TransactionThreshold.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/TransactionThreshold.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/TransactionThreshold.java
new file mode 100644
index 0000000..bdf8d9d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/TransactionThreshold.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered;
+
+public interface TransactionThreshold {
+
+ void adjust(int flowFileCount, long flowFileSize);
+
+ boolean isThresholdMet();
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/TransferFailureDestination.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/TransferFailureDestination.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/TransferFailureDestination.java
new file mode 100644
index 0000000..0a0648f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/TransferFailureDestination.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered;
+
+import org.apache.nifi.controller.queue.FlowFileQueueContents;
+import org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+import java.util.Collection;
+import java.util.function.Function;
+
+public interface TransferFailureDestination {
+ /**
+ * Puts all of the given FlowFiles to the appropriate destination queue
+ *
+ * @param flowFiles the FlowFiles to transfer
+ * @param partitionerUsed the partitioner that was used to determine that the given FlowFiles should be grouped together in the first place
+ */
+ void putAll(Collection<FlowFileRecord> flowFiles, FlowFilePartitioner partitionerUsed);
+
+ /**
+ * Puts all of the given FlowFile Queue Contents to the appropriate destination queue
+ *
+ * @param queueContents a function that returns the FlowFileQueueContents, given a Partition Name
+ * @param partitionerUsed the partitioner that was used to determine that the given FlowFiles should be grouped together in the first place
+ */
+ void putAll(Function<String, FlowFileQueueContents> queueContents, FlowFilePartitioner partitionerUsed);
+
+ /**
+ * Indicates whether or not FlowFiles will need to be rebalanced when transferred to the destination.
+ *
+ * @param partitionerUsed the partitioner that was used to determine that FlowFiles should be grouped together in the first place
+ * @return <code>true</code> if FlowFiles will be rebalanced when transferred, <code>false</code> otherwise
+ */
+ boolean isRebalanceOnFailure(FlowFilePartitioner partitionerUsed);
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/LoadBalanceFlowFileCodec.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/LoadBalanceFlowFileCodec.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/LoadBalanceFlowFileCodec.java
new file mode 100644
index 0000000..cce2730
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/LoadBalanceFlowFileCodec.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.client;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public interface LoadBalanceFlowFileCodec {
+ void encode(FlowFileRecord flowFile, OutputStream out) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/StandardLoadBalanceFlowFileCodec.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/StandardLoadBalanceFlowFileCodec.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/StandardLoadBalanceFlowFileCodec.java
new file mode 100644
index 0000000..8e9b165
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/StandardLoadBalanceFlowFileCodec.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.client;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+public class StandardLoadBalanceFlowFileCodec implements LoadBalanceFlowFileCodec {
+
+ @Override
+ public void encode(final FlowFileRecord flowFile, final OutputStream destination) throws IOException {
+ final DataOutputStream out = new DataOutputStream(destination);
+
+ out.writeInt(flowFile.getAttributes().size());
+ for (final Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) {
+ writeString(entry.getKey(), out);
+ writeString(entry.getValue(), out);
+ }
+
+ out.writeLong(flowFile.getLineageStartDate());
+ out.writeLong(flowFile.getEntryDate());
+ }
+
+ private void writeString(final String value, final DataOutputStream out) throws IOException {
+ final byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
+ out.writeInt(bytes.length);
+ out.write(bytes);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClient.java
new file mode 100644
index 0000000..1bb4053
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClient.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.client.async;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.queue.LoadBalanceCompression;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+import java.io.IOException;
+import java.util.function.BooleanSupplier;
+import java.util.function.Supplier;
+
+public interface AsyncLoadBalanceClient {
+
+ NodeIdentifier getNodeIdentifier();
+
+ void start();
+
+ void stop();
+
+ void register(String connectionId, BooleanSupplier emptySupplier, Supplier<FlowFileRecord> flowFileSupplier,
+ TransactionFailureCallback failureCallback, TransactionCompleteCallback successCallback,
+ Supplier<LoadBalanceCompression> compressionSupplier, BooleanSupplier honorBackpressureSupplier);
+
+ void unregister(String connectionId);
+
+ boolean isRunning();
+
+ boolean isPenalized();
+
+ void nodeDisconnected();
+
+ boolean communicate() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClientFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClientFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClientFactory.java
new file mode 100644
index 0000000..20a4db2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClientFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.client.async;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+public interface AsyncLoadBalanceClientFactory {
+ AsyncLoadBalanceClient createClient(NodeIdentifier nodeIdentifier);
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClientRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClientRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClientRegistry.java
new file mode 100644
index 0000000..49e6aed
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClientRegistry.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.client.async;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.queue.LoadBalanceCompression;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+import java.util.function.BooleanSupplier;
+import java.util.function.Supplier;
+
+public interface AsyncLoadBalanceClientRegistry {
+ void register(String connectionId, NodeIdentifier nodeId, BooleanSupplier emptySupplier, Supplier<FlowFileRecord> flowFileSupplier, TransactionFailureCallback failureCallback,
+ TransactionCompleteCallback successCallback, Supplier<LoadBalanceCompression> compressionSupplier, BooleanSupplier honorBackpressureSupplier);
+
+ void unregister(String connectionId, NodeIdentifier nodeId);
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/TransactionCompleteCallback.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/TransactionCompleteCallback.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/TransactionCompleteCallback.java
new file mode 100644
index 0000000..0c8f8b6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/TransactionCompleteCallback.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.client.async;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+import java.util.List;
+
+public interface TransactionCompleteCallback {
+ void onTransactionComplete(List<FlowFileRecord> flowFilesSent);
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/TransactionFailureCallback.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/TransactionFailureCallback.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/TransactionFailureCallback.java
new file mode 100644
index 0000000..6d1a342
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/TransactionFailureCallback.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.client.async;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+import java.util.List;
+
+public interface TransactionFailureCallback {
+ default void onTransactionFailed(final List<FlowFileRecord> flowFiles, final TransactionPhase transactionPhase) {
+ onTransactionFailed(flowFiles, null, transactionPhase);
+ }
+
+ void onTransactionFailed(List<FlowFileRecord> flowFiles, Exception cause, TransactionPhase transactionPhase);
+
+ boolean isRebalanceOnFailure();
+
+ enum TransactionPhase {
+ /**
+ * Failure occurred when connecting to the node
+ */
+ CONNECTING,
+
+ /**
+ * Failure occurred when sending data to the node
+ */
+ SENDING;
+ }
+}