You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/01/29 20:51:15 UTC

[nifi] branch master updated: NIFI-7076: This closes #4024. Revert "NIFI-6787 - Before: When checking if a load balanced connection queue is full, we compare the totalSize.get() and getMaxQueueSize()."

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 37ef10e  NIFI-7076: This closes #4024. Revert "NIFI-6787 - Before: When checking if a load balanced connection queue is full, we compare the totalSize.get() and getMaxQueueSize()."
37ef10e is described below

commit 37ef10e6fb8a65c9ae563756f7cb26515c1eea0c
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Wed Jan 29 15:34:51 2020 -0500

    NIFI-7076: This closes #4024. Revert "NIFI-6787 - Before: When checking if a load balanced connection queue is full, we compare the totalSize.get() and getMaxQueueSize()."
    
    This reverts commit 773160958209a8a173b4f741517c4bda63a28e82.
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../clustered/SocketLoadBalancedFlowFileQueue.java |  14 +-
 .../partition/AvailableSeekingPartitioner.java     |  62 -------
 .../TestRoundRobinFlowFileQueueBalancing.java      | 203 ---------------------
 3 files changed, 1 insertion(+), 278 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index 837a3f6..e69daad 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -37,7 +37,6 @@ import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics;
 import org.apache.nifi.controller.queue.StandardQueueDiagnostics;
 import org.apache.nifi.controller.queue.SwappablePriorityQueue;
 import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry;
-import org.apache.nifi.controller.queue.clustered.partition.AvailableSeekingPartitioner;
 import org.apache.nifi.controller.queue.clustered.partition.CorrelationAttributePartitioner;
 import org.apache.nifi.controller.queue.clustered.partition.FirstNodePartitioner;
 import org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner;
@@ -212,7 +211,7 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
                 partitioner = new CorrelationAttributePartitioner(partitioningAttribute);
                 break;
             case ROUND_ROBIN:
-                partitioner = new AvailableSeekingPartitioner(new RoundRobinPartitioner(), this::isFull);
+                partitioner = new RoundRobinPartitioner();
                 break;
             case SINGLE_NODE:
                 partitioner = new FirstNodePartitioner();
@@ -516,17 +515,6 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
     }
 
     @Override
-    public boolean isFull() {
-        for (QueuePartition queuePartition : queuePartitions) {
-            if (!isFull(queuePartition.size())) {
-                return false;
-            }
-        }
-
-        return true;
-    }
-
-    @Override
     public boolean isActiveQueueEmpty() {
         return localPartition.isActiveQueueEmpty();
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/AvailableSeekingPartitioner.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/AvailableSeekingPartitioner.java
deleted file mode 100644
index 2ee81b7..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/AvailableSeekingPartitioner.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.controller.queue.clustered.partition;
-
-import org.apache.nifi.controller.queue.QueueSize;
-import org.apache.nifi.controller.repository.FlowFileRecord;
-
-import java.util.function.Function;
-
-public class AvailableSeekingPartitioner implements FlowFilePartitioner {
-    private final FlowFilePartitioner partitionerDelegate;
-    private final Function<QueueSize, Boolean> fullCheck;
-
-    public AvailableSeekingPartitioner(FlowFilePartitioner partitionerDelegate, Function<QueueSize, Boolean> fullCheck) {
-        this.partitionerDelegate = partitionerDelegate;
-        this.fullCheck = fullCheck;
-    }
-
-    @Override
-    public QueuePartition getPartition(FlowFileRecord flowFile, QueuePartition[] partitions, QueuePartition localPartition) {
-        for (int attemptCounter = 0; attemptCounter < partitions.length; attemptCounter++) {
-            QueuePartition selectedPartition = partitionerDelegate.getPartition(flowFile, partitions, localPartition);
-
-            if (!fullCheck.apply(selectedPartition.size())) {
-                return selectedPartition;
-            }
-        }
-
-        // As we don't want to return null here, fall back to original logic if all partitions are full.
-        return partitionerDelegate.getPartition(flowFile, partitions, localPartition);
-    }
-
-    @Override
-    public boolean isRebalanceOnClusterResize() {
-        return partitionerDelegate.isRebalanceOnClusterResize();
-    }
-
-    @Override
-    public boolean isRebalanceOnFailure() {
-        return partitionerDelegate.isRebalanceOnFailure();
-    }
-
-    @Override
-    public boolean isPartitionStatic() {
-        return partitionerDelegate.isPartitionStatic();
-    }
-}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestRoundRobinFlowFileQueueBalancing.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestRoundRobinFlowFileQueueBalancing.java
deleted file mode 100644
index 7037fa7..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestRoundRobinFlowFileQueueBalancing.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.controller.queue.clustered;
-
-import org.apache.nifi.cluster.coordination.ClusterCoordinator;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.controller.MockFlowFileRecord;
-import org.apache.nifi.controller.MockSwapManager;
-import org.apache.nifi.controller.ProcessScheduler;
-import org.apache.nifi.controller.queue.LoadBalanceStrategy;
-import org.apache.nifi.controller.queue.NopConnectionEventListener;
-import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry;
-import org.apache.nifi.controller.repository.ContentRepository;
-import org.apache.nifi.controller.repository.FlowFileRepository;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.provenance.ProvenanceEventRepository;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.stubbing.Answer;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.IntStream;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestRoundRobinFlowFileQueueBalancing {
-
-    private Connection connection;
-    private FlowFileRepository flowFileRepo;
-    private ContentRepository contentRepo;
-    private ProvenanceEventRepository provRepo;
-    private ResourceClaimManager claimManager;
-    private ClusterCoordinator clusterCoordinator;
-    private MockSwapManager swapManager;
-    private EventReporter eventReporter;
-    private SocketLoadBalancedFlowFileQueue queue;
-
-    private List<NodeIdentifier> nodeIds;
-    private int nodePort = 4096;
-
-    private NodeIdentifier localNodeIdentifier;
-    private NodeIdentifier remoteNodeIdentifier1;
-    private NodeIdentifier remoteNodeIdentifier2;
-
-    private int backPressureObjectThreshold = 10;
-
-    @Before
-    public void setup() {
-        MockFlowFileRecord.resetIdGenerator();
-        connection = mock(Connection.class);
-        when(connection.getIdentifier()).thenReturn("unit-test");
-
-        flowFileRepo = mock(FlowFileRepository.class);
-        contentRepo = mock(ContentRepository.class);
-        provRepo = mock(ProvenanceEventRepository.class);
-        claimManager = new StandardResourceClaimManager();
-        clusterCoordinator = mock(ClusterCoordinator.class);
-        swapManager = new MockSwapManager();
-        eventReporter = EventReporter.NO_OP;
-
-        localNodeIdentifier = createNodeIdentifier("00000000-0000-0000-0000-000000000000");
-        remoteNodeIdentifier1 = createNodeIdentifier("11111111-1111-1111-1111-111111111111");
-        remoteNodeIdentifier2 = createNodeIdentifier("22222222-2222-2222-2222-222222222222");
-
-        nodeIds = new ArrayList<>();
-        nodeIds.add(localNodeIdentifier);
-        nodeIds.add(remoteNodeIdentifier1);
-        nodeIds.add(remoteNodeIdentifier2);
-
-        doAnswer((Answer<Set<NodeIdentifier>>) invocation -> new HashSet<>(nodeIds)).when(clusterCoordinator).getNodeIdentifiers();
-        when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(localNodeIdentifier);
-
-        final ProcessScheduler scheduler = mock(ProcessScheduler.class);
-
-        final AsyncLoadBalanceClientRegistry registry = mock(AsyncLoadBalanceClientRegistry.class);
-        queue = new SocketLoadBalancedFlowFileQueue("unit-test", new NopConnectionEventListener(), scheduler, flowFileRepo, provRepo,
-                contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter);
-
-
-        queue.setLoadBalanceStrategy(LoadBalanceStrategy.ROUND_ROBIN, null);
-        queue.setBackPressureObjectThreshold(backPressureObjectThreshold);
-    }
-
-    private NodeIdentifier createNodeIdentifier(final String uuid) {
-        return new NodeIdentifier(uuid, "localhost", nodePort++, "localhost", nodePort++,
-                "localhost", nodePort++, "localhost", nodePort++, nodePort++, true, Collections.emptySet());
-    }
-
-    @Test
-    public void testIsFullShouldReturnFalseWhenLocalIsFullRemotesAreNot() {
-        // GIVEN
-        boolean expected = false;
-        int[] expectedPartitionSizes = {10, 0, 0};
-
-        // WHEN
-        IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(0).put(new MockFlowFileRecord(0L)));
-
-        // THEN
-        testIsFull(expected, expectedPartitionSizes);
-    }
-
-    @Test
-    public void testIsFullShouldReturnFalseWhenLocalAndOneRemoteIsFullOtherRemoteIsNot() {
-        // GIVEN
-        boolean expected = false;
-        int[] expectedPartitionSizes = {10, 10, 0};
-
-        // WHEN
-        IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(0).put(new MockFlowFileRecord(0L)));
-        IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(1).put(new MockFlowFileRecord(0L)));
-
-        // THEN
-        testIsFull(expected, expectedPartitionSizes);
-    }
-
-    @Test
-    public void testIsFullShouldReturnTrueWhenAllPartitionsAreFull() {
-        // GIVEN
-        boolean expected = true;
-        int[] expectedPartitionSizes = {10, 10, 10};
-
-        // WHEN
-        IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(0).put(new MockFlowFileRecord(0L)));
-        IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(1).put(new MockFlowFileRecord(0L)));
-        IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(2).put(new MockFlowFileRecord(0L)));
-
-        // THEN
-        testIsFull(expected, expectedPartitionSizes);
-    }
-
-    @Test
-    public void testBalancingWhenAllPartitionsAreEmpty() {
-        // GIVEN
-        int[] expectedPartitionSizes = {3, 3, 3};
-
-        // WHEN
-        IntStream.rangeClosed(1, 9).forEach(__ -> queue.put(new MockFlowFileRecord(0L)));
-
-        // THEN
-        assertPartitionSizes(expectedPartitionSizes);
-    }
-
-    @Test
-    public void testBalancingWhenLocalPartitionIsFull() {
-        // GIVEN
-        int[] expectedPartitionSizes = {10, 2, 2};
-
-        IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(0).put(new MockFlowFileRecord(0L)));
-
-        // WHEN
-        IntStream.rangeClosed(1, 4).forEach(__ -> queue.put(new MockFlowFileRecord(0L)));
-
-        // THEN
-        assertPartitionSizes(expectedPartitionSizes);
-    }
-
-    private void testIsFull(boolean expected, int[] expectedPartitionSizes) {
-        // GIVEN
-
-        // WHEN
-        boolean actual = queue.isFull();
-
-        // THEN
-        assertEquals(expected, actual);
-        assertPartitionSizes(expectedPartitionSizes);
-    }
-
-    private void assertPartitionSizes(final int[] expectedSizes) {
-        final int[] partitionSizes = new int[queue.getPartitionCount()];
-
-        for (int i = 0; i < partitionSizes.length; i++) {
-            partitionSizes[i] = queue.getPartition(i).size().getObjectCount();
-        }
-
-        assertArrayEquals(expectedSizes, partitionSizes);
-    }
-}