You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2019/09/23 14:07:54 UTC

[nifi] branch master updated: NIFI-6696: Ensured that callback to RemoteQueuePartition do not attempt to obtain the Partition Read Lock. The Read Lock is not necessary as long as the 'partitioner' is volatile, because it doesn't matter whether or not the actual partitions themselves change, since the only partition that would be touched is the Rebalancing Partition, on, which is fixed. Obtaining the partition read lock can lead to a deadlock as outlined in the Jira description.

This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new c721a9e  NIFI-6696: Ensured that callback to RemoteQueuePartition do not attempt to obtain the Partition Read Lock. The Read Lock is not necessary as long as the 'partitioner' is volatile, because it doesn't matter whether or not the actual partitions themselves change, since the only partition that would be touched is the Rebalancing Partition,on, which is fixed. Obtaining the partition read lock can lead to a deadlock as outlined in the Jira description.
c721a9e is described below

commit c721a9ee5fddfefa6b4dc87afa1b6c45cb09d101
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Sep 20 15:05:25 2019 -0400

    NIFI-6696: Ensured that callback to RemoteQueuePartition do not attempt to obtain the Partition Read Lock. The Read Lock is not necessary as long as the 'partitioner' is volatile, because it doesn't matter whether or not the actual partitions themselves change, since the only partition that would be touched is the Rebalancing Partition,on, which is fixed. Obtaining the partition read lock can lead to a deadlock as outlined in the Jira description.
    
    This closes #3760.
    
    Signed-off-by: Bryan Bende <bb...@apache.org>
---
 .../clustered/SocketLoadBalancedFlowFileQueue.java | 53 +++++++---------------
 1 file changed, 17 insertions(+), 36 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index f7b2f38..ae231f7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -114,7 +114,7 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
     private final Lock partitionReadLock = partitionLock.readLock();
     private final Lock partitionWriteLock = partitionLock.writeLock();
     private QueuePartition[] queuePartitions;
-    private FlowFilePartitioner partitioner;
+    private volatile FlowFilePartitioner partitioner;
     private boolean stopped = true;
     private volatile boolean offloaded = false;
 
@@ -337,51 +337,32 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
                     return;
                 }
 
-                partitionReadLock.lock();
-                try {
-                    if (isRebalanceOnFailure(partitionerUsed)) {
-                        logger.debug("Transferring {} FlowFiles to Rebalancing Partition from node {}", flowFiles.size(), nodeId);
-                        rebalancingPartition.rebalance(flowFiles);
-                    } else {
-                        logger.debug("Returning {} FlowFiles to their queue for node {} because Partitioner {} indicates that the FlowFiles should stay where they are", flowFiles.size(), nodeId,
-                            partitioner);
-                        partitionQueue.putAll(flowFiles);
-                    }
-                } finally {
-                    partitionReadLock.unlock();
+                if (isRebalanceOnFailure(partitionerUsed)) {
+                    logger.debug("Transferring {} FlowFiles to Rebalancing Partition from node {}", flowFiles.size(), nodeId);
+                    rebalancingPartition.rebalance(flowFiles);
+                } else {
+                    logger.debug("Returning {} FlowFiles to their queue for node {} because Partitioner {} indicates that the FlowFiles should stay where they are",
+                        flowFiles.size(), nodeId, partitionerUsed);
+                    partitionQueue.putAll(flowFiles);
                 }
             }
 
             @Override
             public void putAll(final Function<String, FlowFileQueueContents> queueContentsFunction, final FlowFilePartitioner partitionerUsed) {
-                partitionReadLock.lock();
-                try {
-                    if (isRebalanceOnFailure(partitionerUsed)) {
-                        final FlowFileQueueContents contents = queueContentsFunction.apply(rebalancingPartition.getSwapPartitionName());
-                        rebalancingPartition.rebalance(contents);
-                        logger.debug("Transferring all {} FlowFiles and {} Swap Files queued for node {} to Rebalancing Partition",
-                            contents.getActiveFlowFiles().size(), contents.getSwapLocations().size(), nodeId);
-                    } else {
-                        logger.debug("Will not transfer FlowFiles queued for node {} to Rebalancing Partition because Partitioner {} indicates that the FlowFiles should stay where they are", nodeId,
-                            partitioner);
-                    }
-                } finally {
-                    partitionReadLock.unlock();
+                if (isRebalanceOnFailure(partitionerUsed)) {
+                    final FlowFileQueueContents contents = queueContentsFunction.apply(rebalancingPartition.getSwapPartitionName());
+                    rebalancingPartition.rebalance(contents);
+                    logger.debug("Transferring all {} FlowFiles and {} Swap Files queued for node {} to Rebalancing Partition",
+                        contents.getActiveFlowFiles().size(), contents.getSwapLocations().size(), nodeId);
+                } else {
+                    logger.debug("Will not transfer FlowFiles queued for node {} to Rebalancing Partition because Partitioner {} indicates that the FlowFiles should stay where they are",
+                        nodeId, partitionerUsed);
                 }
             }
 
             @Override
             public boolean isRebalanceOnFailure(final FlowFilePartitioner partitionerUsed) {
-                partitionReadLock.lock();
-                try {
-                    if (!partitionerUsed.equals(partitioner)) {
-                        return true;
-                    }
-
-                    return partitioner.isRebalanceOnFailure();
-                } finally {
-                    partitionReadLock.unlock();
-                }
+                return partitionerUsed.isRebalanceOnFailure() || !partitionerUsed.equals(partitioner);
             }
         };