You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/01/29 20:49:40 UTC

[nifi] 03/05: NIFI-7059: This closes #4007. Fixed bug that results in priorities not properly being set in the SocketLoadBalancedFlowFileQueue. Even though the queue's setPriorities method was called, the underlying may not have contained the localPartition. As a result, when setPriorities() was called, it did not properly delegate that call to . As a result, the queue knew that the Prioritizers were set but the local queue did not apply them. This happened due to a race condition between queue creatio [...]

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.11.x
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit c40a8d4e5db407d72622dd94e33527d3441ecfae
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Wed Jan 22 12:05:03 2020 -0500

    NIFI-7059: This closes #4007. Fixed bug that results in priorities not properly being set in the SocketLoadBalancedFlowFileQueue. Even though the queue's setPriorities method was called, the underlying  may not have contained the localPartition. As a result, when setPriorities() was called, it did not properly delegate that call to . As a result, the queue knew that the Prioritizers were set but the local queue did not apply them. This happened due to a race condition between queue cr [...]
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../coordination/node/NodeClusterCoordinator.java  |   2 +
 .../controller/queue/AbstractFlowFileQueue.java    |  56 ++++++++---
 .../nifi/controller/queue/QueuePrioritizer.java    |   9 +-
 .../clustered/SocketLoadBalancedFlowFileQueue.java |  22 +++--
 .../TestSocketLoadBalancedFlowFileQueue.java       | 104 ++++++++++++++++++---
 5 files changed, 157 insertions(+), 36 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 888f970..e428046 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -259,6 +259,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         this.nodeId = nodeId;
         nodeStatuses.computeIfAbsent(nodeId.getId(), id -> new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED));
         eventListeners.forEach(listener -> listener.onLocalNodeIdentifierSet(nodeId));
+
+        storeState();
     }
 
     @Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
index 8b29613..09dc670 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
@@ -44,6 +44,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public abstract class AbstractFlowFileQueue implements FlowFileQueue {
     private static final Logger logger = LoggerFactory.getLogger(AbstractFlowFileQueue.class);
@@ -62,6 +65,10 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue {
     private LoadBalanceStrategy loadBalanceStrategy = LoadBalanceStrategy.DO_NOT_LOAD_BALANCE;
     private String partitioningAttribute = null;
 
+    private final ReadWriteLock loadBalanceRWLock = new ReentrantReadWriteLock();
+    private final Lock loadBalanceReadLock = loadBalanceRWLock.readLock();
+    private final Lock loadBalanceWriteLock = loadBalanceRWLock.writeLock();
+
     private LoadBalanceCompression compression = LoadBalanceCompression.DO_NOT_COMPRESS;
 
 
@@ -423,32 +430,57 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue {
     }
 
     @Override
-    public synchronized void setLoadBalanceStrategy(final LoadBalanceStrategy strategy, final String partitioningAttribute) {
-        if (strategy == LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE && !FlowFile.KeyValidator.isValid(partitioningAttribute)) {
-            throw new IllegalArgumentException("Cannot set Load Balance Strategy to " + strategy + " without providing a valid Partitioning Attribute");
-        }
+    public void setLoadBalanceStrategy(final LoadBalanceStrategy strategy, final String partitioningAttribute) {
+        loadBalanceWriteLock.lock();
+        try {
+            if (strategy == LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE && !FlowFile.KeyValidator.isValid(partitioningAttribute)) {
+                throw new IllegalArgumentException("Cannot set Load Balance Strategy to " + strategy + " without providing a valid Partitioning Attribute");
+            }
 
-        this.loadBalanceStrategy = strategy;
-        this.partitioningAttribute = partitioningAttribute;
+            this.loadBalanceStrategy = strategy;
+            this.partitioningAttribute = partitioningAttribute;
+        } finally {
+            loadBalanceWriteLock.unlock();
+        }
     }
 
     @Override
-    public synchronized String getPartitioningAttribute() {
-        return partitioningAttribute;
+    public String getPartitioningAttribute() {
+        loadBalanceReadLock.lock();
+        try {
+            return partitioningAttribute;
+        } finally {
+            loadBalanceReadLock.unlock();
+        }
     }
 
     @Override
-    public synchronized LoadBalanceStrategy getLoadBalanceStrategy() {
-        return loadBalanceStrategy;
+    public LoadBalanceStrategy getLoadBalanceStrategy() {
+        loadBalanceReadLock.lock();
+        try {
+            return loadBalanceStrategy;
+        } finally {
+            loadBalanceReadLock.unlock();
+        }
     }
 
     @Override
     public synchronized void setLoadBalanceCompression(final LoadBalanceCompression compression) {
-        this.compression = compression;
+        loadBalanceWriteLock.lock();
+        try {
+            this.compression = compression;
+        } finally {
+            loadBalanceWriteLock.unlock();
+        }
     }
 
     @Override
     public synchronized LoadBalanceCompression getLoadBalanceCompression() {
-        return compression;
+        loadBalanceReadLock.lock();
+        try {
+            return compression;
+        } finally {
+            loadBalanceReadLock.unlock();
+        }
     }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java
index b78ccff..7c2c1a8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java
@@ -17,15 +17,15 @@
 
 package org.apache.nifi.controller.queue;
 
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 
-import org.apache.nifi.controller.repository.FlowFileRecord;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.flowfile.FlowFilePrioritizer;
-
 public class QueuePrioritizer implements Comparator<FlowFileRecord>, Serializable {
     private static final long serialVersionUID = 1L;
     private final transient List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
@@ -68,6 +68,7 @@ public class QueuePrioritizer implements Comparator<FlowFileRecord>, Serializabl
         final ContentClaim claim1 = f1.getContentClaim();
         final ContentClaim claim2 = f2.getContentClaim();
 
+
         // put the one without a claim first
         if (claim1 == null && claim2 != null) {
             return -1;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index 23a3788..837a3f6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -145,19 +145,25 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
         sortedNodeIdentifiers.sort(Comparator.comparing(NodeIdentifier::getApiAddress));
 
         if (sortedNodeIdentifiers.isEmpty()) {
+            // No Node Identifiers are known yet. Just create the partitions using the local partition.
             queuePartitions = new QueuePartition[] { localPartition };
         } else {
-            queuePartitions = new QueuePartition[sortedNodeIdentifiers.size()];
-
-            for (int i = 0; i < sortedNodeIdentifiers.size(); i++) {
-                final NodeIdentifier nodeId = sortedNodeIdentifiers.get(i);
-                if (nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) {
-                    queuePartitions[i] = localPartition;
-                } else {
-                    queuePartitions[i] = createRemotePartition(nodeId);
+            // The node identifiers are known. Create the partitions using the local partition and 1 Remote Partition for each node
+            // that is not the local node identifier. If the Local Node Identifier is not yet known, that's okay. When it becomes known,
+            // the queuePartitions array will be recreated with the appropriate partitions.
+            final List<QueuePartition> partitionList = new ArrayList<>();
+            partitionList.add(localPartition);
+
+            final NodeIdentifier localNodeId = clusterCoordinator.getLocalNodeIdentifier();
+            for (final NodeIdentifier nodeId : sortedNodeIdentifiers) {
+                if (nodeId.equals(localNodeId)) {
+                    continue;
                 }
+
+                partitionList.add(createRemotePartition(nodeId));
             }
 
+            queuePartitions = partitionList.toArray(new QueuePartition[0]);
         }
 
         partitioner = new LocalPartitionPartitioner();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
index e7d521c..351429d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
@@ -37,6 +37,8 @@ import org.apache.nifi.controller.repository.SwapSummary;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.junit.Assert;
 import org.junit.Before;
@@ -136,6 +138,78 @@ public class TestSocketLoadBalancedFlowFileQueue {
             "localhost", nodePort++, "localhost", nodePort++, nodePort++, true, Collections.emptySet());
     }
 
+
+    @Test
+    public void testPriorities() {
+        final FlowFilePrioritizer iValuePrioritizer = new FlowFilePrioritizer() {
+            @Override
+            public int compare(final FlowFile o1, final FlowFile o2) {
+                final int i1 = Integer.parseInt(o1.getAttribute("i"));
+                final int i2 = Integer.parseInt(o2.getAttribute("i"));
+                return Integer.compare(i1, i2);
+            }
+        };
+
+        queue.setPriorities(Collections.singletonList(iValuePrioritizer));
+
+        final Map<String, String> attributes = new HashMap<>();
+
+        // Add 100 FlowFiles, each with a descending 'i' value (first has i=99, second has i=98, etc.)
+        for (int i = 99; i >= 0; i--) {
+            attributes.put("i", String.valueOf(i));
+            final MockFlowFileRecord flowFile = new MockFlowFileRecord(new HashMap<>(attributes), 0L);
+            queue.put(flowFile);
+        }
+
+        for (int i=0; i < 100; i++) {
+            final FlowFileRecord polled = queue.poll(Collections.emptySet());
+            assertNotNull(polled);
+            assertEquals(String.valueOf(i), polled.getAttribute("i"));
+        }
+
+        assertNull(queue.poll(Collections.emptySet()));
+    }
+
+    @Test
+    public void testPrioritiesWhenSetBeforeLocalNodeIdDetermined() {
+        final FlowFilePrioritizer iValuePrioritizer = new FlowFilePrioritizer() {
+            @Override
+            public int compare(final FlowFile o1, final FlowFile o2) {
+                final int i1 = Integer.parseInt(o1.getAttribute("i"));
+                final int i2 = Integer.parseInt(o2.getAttribute("i"));
+                return Integer.compare(i1, i2);
+            }
+        };
+
+        final ProcessScheduler scheduler = mock(ProcessScheduler.class);
+        final AsyncLoadBalanceClientRegistry registry = mock(AsyncLoadBalanceClientRegistry.class);
+        when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(null);
+
+        queue = new SocketLoadBalancedFlowFileQueue("unit-test", new NopConnectionEventListener(), scheduler, flowFileRepo, provRepo,
+            contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter);
+        queue.setPriorities(Collections.singletonList(iValuePrioritizer));
+
+        when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(null);
+        queue.setNodeIdentifiers(new HashSet<>(nodeIds), true);
+
+        final Map<String, String> attributes = new HashMap<>();
+
+        // Add 100 FlowFiles, each with a descending 'i' value (first has i=99, second has i=98, etc.)
+        for (int i = 99; i >= 0; i--) {
+            attributes.put("i", String.valueOf(i));
+            final MockFlowFileRecord flowFile = new MockFlowFileRecord(new HashMap<>(attributes), 0L);
+            queue.put(flowFile);
+        }
+
+        for (int i=0; i < 100; i++) {
+            final FlowFileRecord polled = queue.poll(Collections.emptySet());
+            assertNotNull(polled);
+            assertEquals(String.valueOf(i), polled.getAttribute("i"));
+        }
+
+        assertNull(queue.poll(Collections.emptySet()));
+    }
+
     @Test
     public void testBinsAccordingToPartitioner() {
         final FlowFilePartitioner partitioner = new StaticFlowFilePartitioner(1);
@@ -395,8 +469,8 @@ public class TestSocketLoadBalancedFlowFileQueue {
         assertPartitionSizes(expectedPartitionSizes);
     }
 
-    @Test(timeout = 100000)
-    public void testLocalNodeIdentifierSet() throws InterruptedException {
+    @Test(timeout = 10000)
+    public void testDataInRemotePartitionForLocalIdIsMovedToLocalPartition() throws InterruptedException {
         nodeIds.clear();
 
         final NodeIdentifier id1 = createNodeIdentifier();
@@ -410,7 +484,7 @@ public class TestSocketLoadBalancedFlowFileQueue {
 
         final AsyncLoadBalanceClientRegistry registry = mock(AsyncLoadBalanceClientRegistry.class);
         queue = new SocketLoadBalancedFlowFileQueue("unit-test", new NopConnectionEventListener(), mock(ProcessScheduler.class), flowFileRepo, provRepo,
-                contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter);
+            contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter);
 
         queue.setFlowFilePartitioner(new RoundRobinPartitioner());
 
@@ -421,22 +495,28 @@ public class TestSocketLoadBalancedFlowFileQueue {
             queue.put(new MockFlowFileRecord(attributes, 0));
         }
 
-        for (int i=0; i < 3; i++) {
-            assertEquals(2, queue.getPartition(i).size().getObjectCount());
-        }
-
-        assertEquals(0, queue.getLocalPartition().size().getObjectCount());
-
         when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(id1);
         clusterTopologyEventListener.onLocalNodeIdentifierSet(id1);
 
-        assertPartitionSizes(new int[] {2, 2, 2});
+        assertEquals(6, queue.size().getObjectCount());
 
-        while (queue.getLocalPartition().size().getObjectCount() != 2) {
-            Thread.sleep(10L);
+        // Ensure that the partitions' object sizes add up to 6. This could take a short time because rebalancing will occur.
+        // So we wait in a loop.
+        while (true) {
+            int totalObjectCount = 0;
+            for (int i = 0; i < queue.getPartitionCount(); i++) {
+                totalObjectCount += queue.getPartition(i).size().getObjectCount();
+            }
+
+            if (totalObjectCount == 6) {
+                break;
+            }
         }
+
+        assertEquals(3, queue.getPartitionCount());
     }
 
+
     private void assertPartitionSizes(final int[] expectedSizes) {
         final int[] partitionSizes = new int[queue.getPartitionCount()];
         while (!Arrays.equals(expectedSizes, partitionSizes)) {