You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/01/29 20:49:40 UTC
[nifi] 03/05: NIFI-7059: This closes #4007. Fixed bug that results
in priorities not properly being set in the
SocketLoadBalancedFlowFileQueue. Even though the queue's setPriorities
method was called,
the underlying may not have contained the localPartition. As a result,
when setPriorities() was called,
it did not properly delegate that call to . As a result,
the queue knew that the Prioritizers were set but the local queue did not
apply them. This happened due to a race condition between queue creatio
[...]
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch support/nifi-1.11.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit c40a8d4e5db407d72622dd94e33527d3441ecfae
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Wed Jan 22 12:05:03 2020 -0500
NIFI-7059: This closes #4007. Fixed bug that results in priorities not properly being set in the SocketLoadBalancedFlowFileQueue. Even though the queue's setPriorities method was called, the underlying may not have contained the localPartition. As a result, when setPriorities() was called, it did not properly delegate that call to . As a result, the queue knew that the Prioritizers were set but the local queue did not apply them. This happened due to a race condition between queue cr [...]
Signed-off-by: Joe Witt <jo...@apache.org>
---
.../coordination/node/NodeClusterCoordinator.java | 2 +
.../controller/queue/AbstractFlowFileQueue.java | 56 ++++++++---
.../nifi/controller/queue/QueuePrioritizer.java | 9 +-
.../clustered/SocketLoadBalancedFlowFileQueue.java | 22 +++--
.../TestSocketLoadBalancedFlowFileQueue.java | 104 ++++++++++++++++++---
5 files changed, 157 insertions(+), 36 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 888f970..e428046 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -259,6 +259,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
this.nodeId = nodeId;
nodeStatuses.computeIfAbsent(nodeId.getId(), id -> new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED));
eventListeners.forEach(listener -> listener.onLocalNodeIdentifierSet(nodeId));
+
+ storeState();
}
@Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
index 8b29613..09dc670 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
@@ -44,6 +44,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
public abstract class AbstractFlowFileQueue implements FlowFileQueue {
private static final Logger logger = LoggerFactory.getLogger(AbstractFlowFileQueue.class);
@@ -62,6 +65,10 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue {
private LoadBalanceStrategy loadBalanceStrategy = LoadBalanceStrategy.DO_NOT_LOAD_BALANCE;
private String partitioningAttribute = null;
+ private final ReadWriteLock loadBalanceRWLock = new ReentrantReadWriteLock();
+ private final Lock loadBalanceReadLock = loadBalanceRWLock.readLock();
+ private final Lock loadBalanceWriteLock = loadBalanceRWLock.writeLock();
+
private LoadBalanceCompression compression = LoadBalanceCompression.DO_NOT_COMPRESS;
@@ -423,32 +430,57 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue {
}
@Override
- public synchronized void setLoadBalanceStrategy(final LoadBalanceStrategy strategy, final String partitioningAttribute) {
- if (strategy == LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE && !FlowFile.KeyValidator.isValid(partitioningAttribute)) {
- throw new IllegalArgumentException("Cannot set Load Balance Strategy to " + strategy + " without providing a valid Partitioning Attribute");
- }
+ public void setLoadBalanceStrategy(final LoadBalanceStrategy strategy, final String partitioningAttribute) {
+ loadBalanceWriteLock.lock();
+ try {
+ if (strategy == LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE && !FlowFile.KeyValidator.isValid(partitioningAttribute)) {
+ throw new IllegalArgumentException("Cannot set Load Balance Strategy to " + strategy + " without providing a valid Partitioning Attribute");
+ }
- this.loadBalanceStrategy = strategy;
- this.partitioningAttribute = partitioningAttribute;
+ this.loadBalanceStrategy = strategy;
+ this.partitioningAttribute = partitioningAttribute;
+ } finally {
+ loadBalanceWriteLock.unlock();
+ }
}
@Override
- public synchronized String getPartitioningAttribute() {
- return partitioningAttribute;
+ public String getPartitioningAttribute() {
+ loadBalanceReadLock.lock();
+ try {
+ return partitioningAttribute;
+ } finally {
+ loadBalanceReadLock.unlock();
+ }
}
@Override
- public synchronized LoadBalanceStrategy getLoadBalanceStrategy() {
- return loadBalanceStrategy;
+ public LoadBalanceStrategy getLoadBalanceStrategy() {
+ loadBalanceReadLock.lock();
+ try {
+ return loadBalanceStrategy;
+ } finally {
+ loadBalanceReadLock.unlock();
+ }
}
@Override
public synchronized void setLoadBalanceCompression(final LoadBalanceCompression compression) {
- this.compression = compression;
+ loadBalanceWriteLock.lock();
+ try {
+ this.compression = compression;
+ } finally {
+ loadBalanceWriteLock.unlock();
+ }
}
@Override
public synchronized LoadBalanceCompression getLoadBalanceCompression() {
- return compression;
+ loadBalanceReadLock.lock();
+ try {
+ return compression;
+ } finally {
+ loadBalanceReadLock.unlock();
+ }
}
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java
index b78ccff..7c2c1a8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java
@@ -17,15 +17,15 @@
package org.apache.nifi.controller.queue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
-import org.apache.nifi.controller.repository.FlowFileRecord;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.flowfile.FlowFilePrioritizer;
-
public class QueuePrioritizer implements Comparator<FlowFileRecord>, Serializable {
private static final long serialVersionUID = 1L;
private final transient List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
@@ -68,6 +68,7 @@ public class QueuePrioritizer implements Comparator<FlowFileRecord>, Serializabl
final ContentClaim claim1 = f1.getContentClaim();
final ContentClaim claim2 = f2.getContentClaim();
+
// put the one without a claim first
if (claim1 == null && claim2 != null) {
return -1;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index 23a3788..837a3f6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -145,19 +145,25 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
sortedNodeIdentifiers.sort(Comparator.comparing(NodeIdentifier::getApiAddress));
if (sortedNodeIdentifiers.isEmpty()) {
+ // No Node Identifiers are known yet. Just create the partitions using the local partition.
queuePartitions = new QueuePartition[] { localPartition };
} else {
- queuePartitions = new QueuePartition[sortedNodeIdentifiers.size()];
-
- for (int i = 0; i < sortedNodeIdentifiers.size(); i++) {
- final NodeIdentifier nodeId = sortedNodeIdentifiers.get(i);
- if (nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) {
- queuePartitions[i] = localPartition;
- } else {
- queuePartitions[i] = createRemotePartition(nodeId);
+ // The node identifiers are known. Create the partitions using the local partition and 1 Remote Partition for each node
+ // that is not the local node identifier. If the Local Node Identifier is not yet known, that's okay. When it becomes known,
+ // the queuePartitions array will be recreated with the appropriate partitions.
+ final List<QueuePartition> partitionList = new ArrayList<>();
+ partitionList.add(localPartition);
+
+ final NodeIdentifier localNodeId = clusterCoordinator.getLocalNodeIdentifier();
+ for (final NodeIdentifier nodeId : sortedNodeIdentifiers) {
+ if (nodeId.equals(localNodeId)) {
+ continue;
}
+
+ partitionList.add(createRemotePartition(nodeId));
}
+ queuePartitions = partitionList.toArray(new QueuePartition[0]);
}
partitioner = new LocalPartitionPartitioner();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
index e7d521c..351429d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
@@ -37,6 +37,8 @@ import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.junit.Assert;
import org.junit.Before;
@@ -136,6 +138,78 @@ public class TestSocketLoadBalancedFlowFileQueue {
"localhost", nodePort++, "localhost", nodePort++, nodePort++, true, Collections.emptySet());
}
+
+ @Test
+ public void testPriorities() {
+ final FlowFilePrioritizer iValuePrioritizer = new FlowFilePrioritizer() {
+ @Override
+ public int compare(final FlowFile o1, final FlowFile o2) {
+ final int i1 = Integer.parseInt(o1.getAttribute("i"));
+ final int i2 = Integer.parseInt(o2.getAttribute("i"));
+ return Integer.compare(i1, i2);
+ }
+ };
+
+ queue.setPriorities(Collections.singletonList(iValuePrioritizer));
+
+ final Map<String, String> attributes = new HashMap<>();
+
+ // Add 100 FlowFiles, each with a descending 'i' value (first has i=99, second has i=98, etc.)
+ for (int i = 99; i >= 0; i--) {
+ attributes.put("i", String.valueOf(i));
+ final MockFlowFileRecord flowFile = new MockFlowFileRecord(new HashMap<>(attributes), 0L);
+ queue.put(flowFile);
+ }
+
+ for (int i=0; i < 100; i++) {
+ final FlowFileRecord polled = queue.poll(Collections.emptySet());
+ assertNotNull(polled);
+ assertEquals(String.valueOf(i), polled.getAttribute("i"));
+ }
+
+ assertNull(queue.poll(Collections.emptySet()));
+ }
+
+ @Test
+ public void testPrioritiesWhenSetBeforeLocalNodeIdDetermined() {
+ final FlowFilePrioritizer iValuePrioritizer = new FlowFilePrioritizer() {
+ @Override
+ public int compare(final FlowFile o1, final FlowFile o2) {
+ final int i1 = Integer.parseInt(o1.getAttribute("i"));
+ final int i2 = Integer.parseInt(o2.getAttribute("i"));
+ return Integer.compare(i1, i2);
+ }
+ };
+
+ final ProcessScheduler scheduler = mock(ProcessScheduler.class);
+ final AsyncLoadBalanceClientRegistry registry = mock(AsyncLoadBalanceClientRegistry.class);
+ when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(null);
+
+ queue = new SocketLoadBalancedFlowFileQueue("unit-test", new NopConnectionEventListener(), scheduler, flowFileRepo, provRepo,
+ contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter);
+ queue.setPriorities(Collections.singletonList(iValuePrioritizer));
+
+ when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(null);
+ queue.setNodeIdentifiers(new HashSet<>(nodeIds), true);
+
+ final Map<String, String> attributes = new HashMap<>();
+
+ // Add 100 FlowFiles, each with a descending 'i' value (first has i=99, second has i=98, etc.)
+ for (int i = 99; i >= 0; i--) {
+ attributes.put("i", String.valueOf(i));
+ final MockFlowFileRecord flowFile = new MockFlowFileRecord(new HashMap<>(attributes), 0L);
+ queue.put(flowFile);
+ }
+
+ for (int i=0; i < 100; i++) {
+ final FlowFileRecord polled = queue.poll(Collections.emptySet());
+ assertNotNull(polled);
+ assertEquals(String.valueOf(i), polled.getAttribute("i"));
+ }
+
+ assertNull(queue.poll(Collections.emptySet()));
+ }
+
@Test
public void testBinsAccordingToPartitioner() {
final FlowFilePartitioner partitioner = new StaticFlowFilePartitioner(1);
@@ -395,8 +469,8 @@ public class TestSocketLoadBalancedFlowFileQueue {
assertPartitionSizes(expectedPartitionSizes);
}
- @Test(timeout = 100000)
- public void testLocalNodeIdentifierSet() throws InterruptedException {
+ @Test(timeout = 10000)
+ public void testDataInRemotePartitionForLocalIdIsMovedToLocalPartition() throws InterruptedException {
nodeIds.clear();
final NodeIdentifier id1 = createNodeIdentifier();
@@ -410,7 +484,7 @@ public class TestSocketLoadBalancedFlowFileQueue {
final AsyncLoadBalanceClientRegistry registry = mock(AsyncLoadBalanceClientRegistry.class);
queue = new SocketLoadBalancedFlowFileQueue("unit-test", new NopConnectionEventListener(), mock(ProcessScheduler.class), flowFileRepo, provRepo,
- contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter);
+ contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter);
queue.setFlowFilePartitioner(new RoundRobinPartitioner());
@@ -421,22 +495,28 @@ public class TestSocketLoadBalancedFlowFileQueue {
queue.put(new MockFlowFileRecord(attributes, 0));
}
- for (int i=0; i < 3; i++) {
- assertEquals(2, queue.getPartition(i).size().getObjectCount());
- }
-
- assertEquals(0, queue.getLocalPartition().size().getObjectCount());
-
when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(id1);
clusterTopologyEventListener.onLocalNodeIdentifierSet(id1);
- assertPartitionSizes(new int[] {2, 2, 2});
+ assertEquals(6, queue.size().getObjectCount());
- while (queue.getLocalPartition().size().getObjectCount() != 2) {
- Thread.sleep(10L);
+ // Ensure that the partitions' object sizes add up to 6. This could take a short time because rebalancing will occur.
+ // So we wait in a loop.
+ while (true) {
+ int totalObjectCount = 0;
+ for (int i = 0; i < queue.getPartitionCount(); i++) {
+ totalObjectCount += queue.getPartition(i).size().getObjectCount();
+ }
+
+ if (totalObjectCount == 6) {
+ break;
+ }
}
+
+ assertEquals(3, queue.getPartitionCount());
}
+
private void assertPartitionSizes(final int[] expectedSizes) {
final int[] partitionSizes = new int[queue.getPartitionCount()];
while (!Arrays.equals(expectedSizes, partitionSizes)) {