You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/01/29 20:51:15 UTC
[nifi] branch master updated: NIFI-7076: This closes #4024. Revert
"NIFI-6787 - Before: When checking if a load balanced connection queue is
full, we compare the totalSize.get() and getMaxQueueSize()."
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 37ef10e NIFI-7076: This closes #4024. Revert "NIFI-6787 - Before: When checking if a load balanced connection queue is full, we compare the totalSize.get() and getMaxQueueSize()."
37ef10e is described below
commit 37ef10e6fb8a65c9ae563756f7cb26515c1eea0c
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Wed Jan 29 15:34:51 2020 -0500
NIFI-7076: This closes #4024. Revert "NIFI-6787 - Before: When checking if a load balanced connection queue is full, we compare the totalSize.get() and getMaxQueueSize()."
This reverts commit 773160958209a8a173b4f741517c4bda63a28e82.
Signed-off-by: Joe Witt <jo...@apache.org>
---
.../clustered/SocketLoadBalancedFlowFileQueue.java | 14 +-
.../partition/AvailableSeekingPartitioner.java | 62 -------
.../TestRoundRobinFlowFileQueueBalancing.java | 203 ---------------------
3 files changed, 1 insertion(+), 278 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index 837a3f6..e69daad 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -37,7 +37,6 @@ import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics;
import org.apache.nifi.controller.queue.StandardQueueDiagnostics;
import org.apache.nifi.controller.queue.SwappablePriorityQueue;
import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry;
-import org.apache.nifi.controller.queue.clustered.partition.AvailableSeekingPartitioner;
import org.apache.nifi.controller.queue.clustered.partition.CorrelationAttributePartitioner;
import org.apache.nifi.controller.queue.clustered.partition.FirstNodePartitioner;
import org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner;
@@ -212,7 +211,7 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
partitioner = new CorrelationAttributePartitioner(partitioningAttribute);
break;
case ROUND_ROBIN:
- partitioner = new AvailableSeekingPartitioner(new RoundRobinPartitioner(), this::isFull);
+ partitioner = new RoundRobinPartitioner();
break;
case SINGLE_NODE:
partitioner = new FirstNodePartitioner();
@@ -516,17 +515,6 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
}
@Override
- public boolean isFull() {
- for (QueuePartition queuePartition : queuePartitions) {
- if (!isFull(queuePartition.size())) {
- return false;
- }
- }
-
- return true;
- }
-
- @Override
public boolean isActiveQueueEmpty() {
return localPartition.isActiveQueueEmpty();
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/AvailableSeekingPartitioner.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/AvailableSeekingPartitioner.java
deleted file mode 100644
index 2ee81b7..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/AvailableSeekingPartitioner.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.controller.queue.clustered.partition;
-
-import org.apache.nifi.controller.queue.QueueSize;
-import org.apache.nifi.controller.repository.FlowFileRecord;
-
-import java.util.function.Function;
-
-public class AvailableSeekingPartitioner implements FlowFilePartitioner {
- private final FlowFilePartitioner partitionerDelegate;
- private final Function<QueueSize, Boolean> fullCheck;
-
- public AvailableSeekingPartitioner(FlowFilePartitioner partitionerDelegate, Function<QueueSize, Boolean> fullCheck) {
- this.partitionerDelegate = partitionerDelegate;
- this.fullCheck = fullCheck;
- }
-
- @Override
- public QueuePartition getPartition(FlowFileRecord flowFile, QueuePartition[] partitions, QueuePartition localPartition) {
- for (int attemptCounter = 0; attemptCounter < partitions.length; attemptCounter++) {
- QueuePartition selectedPartition = partitionerDelegate.getPartition(flowFile, partitions, localPartition);
-
- if (!fullCheck.apply(selectedPartition.size())) {
- return selectedPartition;
- }
- }
-
- // As we don't want to return null here, fall back to original logic if all partitions are full.
- return partitionerDelegate.getPartition(flowFile, partitions, localPartition);
- }
-
- @Override
- public boolean isRebalanceOnClusterResize() {
- return partitionerDelegate.isRebalanceOnClusterResize();
- }
-
- @Override
- public boolean isRebalanceOnFailure() {
- return partitionerDelegate.isRebalanceOnFailure();
- }
-
- @Override
- public boolean isPartitionStatic() {
- return partitionerDelegate.isPartitionStatic();
- }
-}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestRoundRobinFlowFileQueueBalancing.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestRoundRobinFlowFileQueueBalancing.java
deleted file mode 100644
index 7037fa7..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestRoundRobinFlowFileQueueBalancing.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.controller.queue.clustered;
-
-import org.apache.nifi.cluster.coordination.ClusterCoordinator;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.controller.MockFlowFileRecord;
-import org.apache.nifi.controller.MockSwapManager;
-import org.apache.nifi.controller.ProcessScheduler;
-import org.apache.nifi.controller.queue.LoadBalanceStrategy;
-import org.apache.nifi.controller.queue.NopConnectionEventListener;
-import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry;
-import org.apache.nifi.controller.repository.ContentRepository;
-import org.apache.nifi.controller.repository.FlowFileRepository;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.provenance.ProvenanceEventRepository;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.stubbing.Answer;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.IntStream;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestRoundRobinFlowFileQueueBalancing {
-
- private Connection connection;
- private FlowFileRepository flowFileRepo;
- private ContentRepository contentRepo;
- private ProvenanceEventRepository provRepo;
- private ResourceClaimManager claimManager;
- private ClusterCoordinator clusterCoordinator;
- private MockSwapManager swapManager;
- private EventReporter eventReporter;
- private SocketLoadBalancedFlowFileQueue queue;
-
- private List<NodeIdentifier> nodeIds;
- private int nodePort = 4096;
-
- private NodeIdentifier localNodeIdentifier;
- private NodeIdentifier remoteNodeIdentifier1;
- private NodeIdentifier remoteNodeIdentifier2;
-
- private int backPressureObjectThreshold = 10;
-
- @Before
- public void setup() {
- MockFlowFileRecord.resetIdGenerator();
- connection = mock(Connection.class);
- when(connection.getIdentifier()).thenReturn("unit-test");
-
- flowFileRepo = mock(FlowFileRepository.class);
- contentRepo = mock(ContentRepository.class);
- provRepo = mock(ProvenanceEventRepository.class);
- claimManager = new StandardResourceClaimManager();
- clusterCoordinator = mock(ClusterCoordinator.class);
- swapManager = new MockSwapManager();
- eventReporter = EventReporter.NO_OP;
-
- localNodeIdentifier = createNodeIdentifier("00000000-0000-0000-0000-000000000000");
- remoteNodeIdentifier1 = createNodeIdentifier("11111111-1111-1111-1111-111111111111");
- remoteNodeIdentifier2 = createNodeIdentifier("22222222-2222-2222-2222-222222222222");
-
- nodeIds = new ArrayList<>();
- nodeIds.add(localNodeIdentifier);
- nodeIds.add(remoteNodeIdentifier1);
- nodeIds.add(remoteNodeIdentifier2);
-
- doAnswer((Answer<Set<NodeIdentifier>>) invocation -> new HashSet<>(nodeIds)).when(clusterCoordinator).getNodeIdentifiers();
- when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(localNodeIdentifier);
-
- final ProcessScheduler scheduler = mock(ProcessScheduler.class);
-
- final AsyncLoadBalanceClientRegistry registry = mock(AsyncLoadBalanceClientRegistry.class);
- queue = new SocketLoadBalancedFlowFileQueue("unit-test", new NopConnectionEventListener(), scheduler, flowFileRepo, provRepo,
- contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter);
-
-
- queue.setLoadBalanceStrategy(LoadBalanceStrategy.ROUND_ROBIN, null);
- queue.setBackPressureObjectThreshold(backPressureObjectThreshold);
- }
-
- private NodeIdentifier createNodeIdentifier(final String uuid) {
- return new NodeIdentifier(uuid, "localhost", nodePort++, "localhost", nodePort++,
- "localhost", nodePort++, "localhost", nodePort++, nodePort++, true, Collections.emptySet());
- }
-
- @Test
- public void testIsFullShouldReturnFalseWhenLocalIsFullRemotesAreNot() {
- // GIVEN
- boolean expected = false;
- int[] expectedPartitionSizes = {10, 0, 0};
-
- // WHEN
- IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(0).put(new MockFlowFileRecord(0L)));
-
- // THEN
- testIsFull(expected, expectedPartitionSizes);
- }
-
- @Test
- public void testIsFullShouldReturnFalseWhenLocalAndOneRemoteIsFullOtherRemoteIsNot() {
- // GIVEN
- boolean expected = false;
- int[] expectedPartitionSizes = {10, 10, 0};
-
- // WHEN
- IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(0).put(new MockFlowFileRecord(0L)));
- IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(1).put(new MockFlowFileRecord(0L)));
-
- // THEN
- testIsFull(expected, expectedPartitionSizes);
- }
-
- @Test
- public void testIsFullShouldReturnTrueWhenAllPartitionsAreFull() {
- // GIVEN
- boolean expected = true;
- int[] expectedPartitionSizes = {10, 10, 10};
-
- // WHEN
- IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(0).put(new MockFlowFileRecord(0L)));
- IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(1).put(new MockFlowFileRecord(0L)));
- IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(2).put(new MockFlowFileRecord(0L)));
-
- // THEN
- testIsFull(expected, expectedPartitionSizes);
- }
-
- @Test
- public void testBalancingWhenAllPartitionsAreEmpty() {
- // GIVEN
- int[] expectedPartitionSizes = {3, 3, 3};
-
- // WHEN
- IntStream.rangeClosed(1, 9).forEach(__ -> queue.put(new MockFlowFileRecord(0L)));
-
- // THEN
- assertPartitionSizes(expectedPartitionSizes);
- }
-
- @Test
- public void testBalancingWhenLocalPartitionIsFull() {
- // GIVEN
- int[] expectedPartitionSizes = {10, 2, 2};
-
- IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(0).put(new MockFlowFileRecord(0L)));
-
- // WHEN
- IntStream.rangeClosed(1, 4).forEach(__ -> queue.put(new MockFlowFileRecord(0L)));
-
- // THEN
- assertPartitionSizes(expectedPartitionSizes);
- }
-
- private void testIsFull(boolean expected, int[] expectedPartitionSizes) {
- // GIVEN
-
- // WHEN
- boolean actual = queue.isFull();
-
- // THEN
- assertEquals(expected, actual);
- assertPartitionSizes(expectedPartitionSizes);
- }
-
- private void assertPartitionSizes(final int[] expectedSizes) {
- final int[] partitionSizes = new int[queue.getPartitionCount()];
-
- for (int i = 0; i < partitionSizes.length; i++) {
- partitionSizes[i] = queue.getPartition(i).size().getObjectCount();
- }
-
- assertArrayEquals(expectedSizes, partitionSizes);
- }
-}