You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2019/09/23 14:07:54 UTC
[nifi] branch master updated: NIFI-6696: Ensured that callback to
RemoteQueuePartition do not attempt to obtain the Partition Read Lock. The
Read Lock is not necessary as long as the 'partitioner' is volatile,
because it doesn't matter whether or not the actual partitions themselves
change,
since the only partition that would be touched is the Rebalancing Partition,
on,
which is fixed. Obtaining the partition read lock can lead to a deadlock as
outlined in the Jira description.
This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new c721a9e NIFI-6696: Ensured that callback to RemoteQueuePartition do not attempt to obtain the Partition Read Lock. The Read Lock is not necessary as long as the 'partitioner' is volatile, because it doesn't matter whether or not the actual partitions themselves change, since the only partition that would be touched is the Rebalancing Partition,on, which is fixed. Obtaining the partition read lock can lead to a deadlock as outlined in the Jira description.
c721a9e is described below
commit c721a9ee5fddfefa6b4dc87afa1b6c45cb09d101
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Sep 20 15:05:25 2019 -0400
NIFI-6696: Ensured that callback to RemoteQueuePartition do not attempt to obtain the Partition Read Lock. The Read Lock is not necessary as long as the 'partitioner' is volatile, because it doesn't matter whether or not the actual partitions themselves change, since the only partition that would be touched is the Rebalancing Partition,on, which is fixed. Obtaining the partition read lock can lead to a deadlock as outlined in the Jira description.
This closes #3760.
Signed-off-by: Bryan Bende <bb...@apache.org>
---
.../clustered/SocketLoadBalancedFlowFileQueue.java | 53 +++++++---------------
1 file changed, 17 insertions(+), 36 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index f7b2f38..ae231f7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -114,7 +114,7 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
private final Lock partitionReadLock = partitionLock.readLock();
private final Lock partitionWriteLock = partitionLock.writeLock();
private QueuePartition[] queuePartitions;
- private FlowFilePartitioner partitioner;
+ private volatile FlowFilePartitioner partitioner;
private boolean stopped = true;
private volatile boolean offloaded = false;
@@ -337,51 +337,32 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
return;
}
- partitionReadLock.lock();
- try {
- if (isRebalanceOnFailure(partitionerUsed)) {
- logger.debug("Transferring {} FlowFiles to Rebalancing Partition from node {}", flowFiles.size(), nodeId);
- rebalancingPartition.rebalance(flowFiles);
- } else {
- logger.debug("Returning {} FlowFiles to their queue for node {} because Partitioner {} indicates that the FlowFiles should stay where they are", flowFiles.size(), nodeId,
- partitioner);
- partitionQueue.putAll(flowFiles);
- }
- } finally {
- partitionReadLock.unlock();
+ if (isRebalanceOnFailure(partitionerUsed)) {
+ logger.debug("Transferring {} FlowFiles to Rebalancing Partition from node {}", flowFiles.size(), nodeId);
+ rebalancingPartition.rebalance(flowFiles);
+ } else {
+ logger.debug("Returning {} FlowFiles to their queue for node {} because Partitioner {} indicates that the FlowFiles should stay where they are",
+ flowFiles.size(), nodeId, partitionerUsed);
+ partitionQueue.putAll(flowFiles);
}
}
@Override
public void putAll(final Function<String, FlowFileQueueContents> queueContentsFunction, final FlowFilePartitioner partitionerUsed) {
- partitionReadLock.lock();
- try {
- if (isRebalanceOnFailure(partitionerUsed)) {
- final FlowFileQueueContents contents = queueContentsFunction.apply(rebalancingPartition.getSwapPartitionName());
- rebalancingPartition.rebalance(contents);
- logger.debug("Transferring all {} FlowFiles and {} Swap Files queued for node {} to Rebalancing Partition",
- contents.getActiveFlowFiles().size(), contents.getSwapLocations().size(), nodeId);
- } else {
- logger.debug("Will not transfer FlowFiles queued for node {} to Rebalancing Partition because Partitioner {} indicates that the FlowFiles should stay where they are", nodeId,
- partitioner);
- }
- } finally {
- partitionReadLock.unlock();
+ if (isRebalanceOnFailure(partitionerUsed)) {
+ final FlowFileQueueContents contents = queueContentsFunction.apply(rebalancingPartition.getSwapPartitionName());
+ rebalancingPartition.rebalance(contents);
+ logger.debug("Transferring all {} FlowFiles and {} Swap Files queued for node {} to Rebalancing Partition",
+ contents.getActiveFlowFiles().size(), contents.getSwapLocations().size(), nodeId);
+ } else {
+ logger.debug("Will not transfer FlowFiles queued for node {} to Rebalancing Partition because Partitioner {} indicates that the FlowFiles should stay where they are",
+ nodeId, partitionerUsed);
}
}
@Override
public boolean isRebalanceOnFailure(final FlowFilePartitioner partitionerUsed) {
- partitionReadLock.lock();
- try {
- if (!partitionerUsed.equals(partitioner)) {
- return true;
- }
-
- return partitioner.isRebalanceOnFailure();
- } finally {
- partitionReadLock.unlock();
- }
+ return partitionerUsed.isRebalanceOnFailure() || !partitionerUsed.equals(partitioner);
}
};