You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by js...@apache.org on 2018/10/04 20:29:08 UTC
[12/14] nifi git commit: NIFI-5516: Implement Load-Balanced
Connections Refactoring StandardFlowFileQueue to have an
AbstractFlowFileQueue Refactored more into AbstractFlowFileQueue Added
documentation, cleaned up code some Refactored FlowFileQueue so th
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
index 1378d3b..fb06a15 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
@@ -16,24 +16,6 @@
*/
package org.apache.nifi.cluster.coordination.node;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
import org.apache.nifi.cluster.coordination.flow.FlowElection;
import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
import org.apache.nifi.cluster.protocol.ConnectionRequest;
@@ -47,8 +29,12 @@ import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.services.FlowService;
+import org.apache.nifi.state.MockStateMap;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.revision.RevisionManager;
import org.junit.Assert;
@@ -58,11 +44,33 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
+
public class TestNodeClusterCoordinator {
private NodeClusterCoordinator coordinator;
private ClusterCoordinationProtocolSenderListener senderListener;
private List<NodeConnectionStatus> nodeStatuses;
+ private StateManagerProvider stateManagerProvider;
private NiFiProperties createProperties() {
final Map<String,String> addProps = new HashMap<>();
@@ -76,12 +84,18 @@ public class TestNodeClusterCoordinator {
senderListener = Mockito.mock(ClusterCoordinationProtocolSenderListener.class);
nodeStatuses = Collections.synchronizedList(new ArrayList<>());
+ stateManagerProvider = Mockito.mock(StateManagerProvider.class);
+
+ final StateManager stateManager = Mockito.mock(StateManager.class);
+ when(stateManager.getState(any(Scope.class))).thenReturn(new MockStateMap(Collections.emptyMap(), 1));
+ when(stateManagerProvider.getStateManager(anyString())).thenReturn(stateManager);
+
final EventReporter eventReporter = Mockito.mock(EventReporter.class);
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
- Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
+ when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
- coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties(), null) {
+ coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties(), null, stateManagerProvider) {
@Override
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
nodeStatuses.add(updatedStatus);
@@ -90,7 +104,7 @@ public class TestNodeClusterCoordinator {
final FlowService flowService = Mockito.mock(FlowService.class);
final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50], new HashSet<>());
- Mockito.when(flowService.createDataFlow()).thenReturn(dataFlow);
+ when(flowService.createDataFlow()).thenReturn(dataFlow);
coordinator.setFlowService(flowService);
}
@@ -130,14 +144,14 @@ public class TestNodeClusterCoordinator {
}
@Test
- public void testTryAgainIfNoFlowServiceSet() {
+ public void testTryAgainIfNoFlowServiceSet() throws IOException {
final ClusterCoordinationProtocolSenderListener senderListener = Mockito.mock(ClusterCoordinationProtocolSenderListener.class);
final EventReporter eventReporter = Mockito.mock(EventReporter.class);
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
- Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
+ when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(),
- null, revisionManager, createProperties(), null) {
+ null, revisionManager, createProperties(), null, stateManagerProvider) {
@Override
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
}
@@ -150,7 +164,7 @@ public class TestNodeClusterCoordinator {
coordinator.setConnected(true);
- final ProtocolMessage protocolResponse = coordinator.handle(requestMsg);
+ final ProtocolMessage protocolResponse = coordinator.handle(requestMsg, Collections.emptySet());
assertNotNull(protocolResponse);
assertTrue(protocolResponse instanceof ConnectionResponseMessage);
@@ -164,7 +178,7 @@ public class TestNodeClusterCoordinator {
final ClusterCoordinationProtocolSenderListener senderListener = Mockito.mock(ClusterCoordinationProtocolSenderListener.class);
final AtomicReference<ReconnectionRequestMessage> requestRef = new AtomicReference<>();
- Mockito.when(senderListener.requestReconnection(Mockito.any(ReconnectionRequestMessage.class))).thenAnswer(new Answer<Object>() {
+ when(senderListener.requestReconnection(any(ReconnectionRequestMessage.class))).thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
final ReconnectionRequestMessage msg = invocation.getArgumentAt(0, ReconnectionRequestMessage.class);
@@ -175,10 +189,10 @@ public class TestNodeClusterCoordinator {
final EventReporter eventReporter = Mockito.mock(EventReporter.class);
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
- Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
+ when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(),
- null, revisionManager, createProperties(), null) {
+ null, revisionManager, createProperties(), null, stateManagerProvider) {
@Override
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
}
@@ -186,7 +200,7 @@ public class TestNodeClusterCoordinator {
final FlowService flowService = Mockito.mock(FlowService.class);
final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50], new HashSet<>());
- Mockito.when(flowService.createDataFlowFromController()).thenReturn(dataFlow);
+ when(flowService.createDataFlowFromController()).thenReturn(dataFlow);
coordinator.setFlowService(flowService);
coordinator.setConnected(true);
@@ -232,7 +246,7 @@ public class TestNodeClusterCoordinator {
@Test(timeout = 5000)
public void testStatusChangesReplicated() throws InterruptedException, IOException {
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
- Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
+ when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
// Create a connection request message and send to the coordinator
final NodeIdentifier requestedNodeId = createNodeId(1);
@@ -397,7 +411,7 @@ public class TestNodeClusterCoordinator {
final NodeStatusChangeMessage msg = new NodeStatusChangeMessage();
msg.setNodeId(nodeId1);
msg.setNodeConnectionStatus(oldStatus);
- coordinator.handle(msg);
+ coordinator.handle(msg, Collections.emptySet());
// Ensure that no status change message was send
Thread.sleep(1000);
@@ -413,7 +427,7 @@ public class TestNodeClusterCoordinator {
final ConnectionRequestMessage crm = new ConnectionRequestMessage();
crm.setConnectionRequest(connectionRequest);
- final ProtocolMessage response = coordinator.handle(crm);
+ final ProtocolMessage response = coordinator.handle(crm, Collections.emptySet());
assertNotNull(response);
assertTrue(response instanceof ConnectionResponseMessage);
final ConnectionResponseMessage responseMessage = (ConnectionResponseMessage) response;
@@ -424,7 +438,7 @@ public class TestNodeClusterCoordinator {
final ConnectionRequestMessage crm2 = new ConnectionRequestMessage();
crm2.setConnectionRequest(conRequest2);
- final ProtocolMessage conflictingResponse = coordinator.handle(crm2);
+ final ProtocolMessage conflictingResponse = coordinator.handle(crm2, Collections.emptySet());
assertNotNull(conflictingResponse);
assertTrue(conflictingResponse instanceof ConnectionResponseMessage);
final ConnectionResponseMessage conflictingResponseMessage = (ConnectionResponseMessage) conflictingResponse;
@@ -446,7 +460,7 @@ public class TestNodeClusterCoordinator {
final ConnectionRequest request = new ConnectionRequest(requestedNodeId, new StandardDataFlow(new byte[0], new byte[0], new byte[0], new HashSet<>()));
final ConnectionRequestMessage requestMsg = new ConnectionRequestMessage();
requestMsg.setConnectionRequest(request);
- return coordinator.handle(requestMsg);
+ return coordinator.handle(requestMsg, Collections.emptySet());
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java
index 45a2e42..3980865 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java
@@ -219,7 +219,7 @@ public class ClusterConnectionIT {
cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS);
final Node coordinator = cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
- final NodeIdentifier node4NotReallyInCluster = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9283, "localhost", 9284, "localhost", 9285, null, false, null);
+ final NodeIdentifier node4NotReallyInCluster = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9283, "localhost", 9284, "localhost", 9286, "localhost", 9285, null, false, null);
final Map<NodeIdentifier, NodeConnectionStatus> replacementStatuses = new HashMap<>();
replacementStatuses.put(node1.getIdentifier(), new NodeConnectionStatus(node1.getIdentifier(), DisconnectionCode.USER_DISCONNECTED));
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
index e0d8a97..3133736 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
@@ -17,17 +17,6 @@
package org.apache.nifi.cluster.integration;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.bundle.Bundle;
@@ -73,6 +62,18 @@ import org.apache.nifi.web.revision.RevisionManager;
import org.junit.Assert;
import org.mockito.Mockito;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
public class Node {
private final NodeIdentifier nodeId;
private final NiFiProperties nodeProperties;
@@ -133,7 +134,7 @@ public class Node {
private static NodeIdentifier createNodeId() {
- return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", createPort(), "localhost", createPort(), "localhost", null, null, false, null);
+ return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", createPort(), "localhost", createPort(), "localhost", createPort(), "localhost", null, null, false, null);
}
/**
@@ -296,8 +297,13 @@ public class Node {
}
final ClusterCoordinationProtocolSenderListener protocolSenderListener = new ClusterCoordinationProtocolSenderListener(createCoordinatorProtocolSender(), protocolListener);
- return new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, null,
- revisionManager, nodeProperties, protocolSender);
+ try {
+ return new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, null,
+ revisionManager, nodeProperties, protocolSender);
+ } catch (IOException e) {
+ Assert.fail(e.toString());
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectionEntityMergerSpec.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectionEntityMergerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectionEntityMergerSpec.groovy
index ffa3429..83d301b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectionEntityMergerSpec.groovy
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectionEntityMergerSpec.groovy
@@ -63,6 +63,6 @@ class ConnectionEntityMergerSpec extends Specification {
}
def createNodeIdentifier(int id) {
- new NodeIdentifier("cluster-node-$id", 'addr', id, 'sktaddr', id * 10, 'stsaddr', id * 100, id * 1000, false, null)
+ new NodeIdentifier("cluster-node-$id", 'addr', id, 'sktaddr', id * 10, null, id * 10, 'stsaddr', id * 100, id * 1000, false, null)
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy
index bb1d595..3997bec 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy
@@ -147,6 +147,6 @@ class ControllerServiceEntityMergerSpec extends Specification {
}
def createNodeIdentifier(int id) {
- new NodeIdentifier("cluster-node-$id", 'addr', id, 'sktaddr', id * 10, 'stsaddr', id * 100, id * 1000, false, null)
+ new NodeIdentifier("cluster-node-$id", 'addr', id, 'sktaddr', id * 10, null, id * 10, 'stsaddr', id * 100, id * 1000, false, null)
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/LabelEntityMergerSpec.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/LabelEntityMergerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/LabelEntityMergerSpec.groovy
index 028c864..0a485b2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/LabelEntityMergerSpec.groovy
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/LabelEntityMergerSpec.groovy
@@ -55,6 +55,6 @@ class LabelEntityMergerSpec extends Specification {
}
def createNodeIdentifier(int id) {
- new NodeIdentifier("cluster-node-$id", 'addr', id, 'sktaddr', id * 10, 'stsaddr', id * 100, id * 1000, false, null)
+ new NodeIdentifier("cluster-node-$id", 'addr', id, 'sktaddr', id * 10, null, id * 10, 'stsaddr', id * 100, id * 1000, false, null)
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
deleted file mode 100644
index 6f55e79..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.controller;
-
-import org.apache.nifi.controller.queue.DropFlowFileState;
-import org.apache.nifi.controller.queue.DropFlowFileStatus;
-import org.apache.nifi.controller.queue.QueueSize;
-
-public class DropFlowFileRequest implements DropFlowFileStatus {
- private final String identifier;
- private final long submissionTime = System.currentTimeMillis();
-
- private volatile QueueSize originalSize;
- private volatile QueueSize currentSize;
- private volatile QueueSize droppedSize = new QueueSize(0, 0L);
- private volatile long lastUpdated = System.currentTimeMillis();
- private volatile String failureReason;
-
- private DropFlowFileState state = DropFlowFileState.WAITING_FOR_LOCK;
-
-
- public DropFlowFileRequest(final String identifier) {
- this.identifier = identifier;
- }
-
- @Override
- public String getRequestIdentifier() {
- return identifier;
- }
-
- @Override
- public long getRequestSubmissionTime() {
- return submissionTime;
- }
-
- @Override
- public QueueSize getOriginalSize() {
- return originalSize;
- }
-
- void setOriginalSize(final QueueSize originalSize) {
- this.originalSize = originalSize;
- }
-
- @Override
- public QueueSize getCurrentSize() {
- return currentSize;
- }
-
- void setCurrentSize(final QueueSize queueSize) {
- this.currentSize = queueSize;
- }
-
- @Override
- public QueueSize getDroppedSize() {
- return droppedSize;
- }
-
- void setDroppedSize(final QueueSize droppedSize) {
- this.droppedSize = droppedSize;
- }
-
- @Override
- public synchronized DropFlowFileState getState() {
- return state;
- }
-
- @Override
- public long getLastUpdated() {
- return lastUpdated;
- }
-
- @Override
- public String getFailureReason() {
- return failureReason;
- }
-
- synchronized void setState(final DropFlowFileState state) {
- setState(state, null);
- }
-
- synchronized void setState(final DropFlowFileState state, final String explanation) {
- this.state = state;
- this.failureReason = explanation;
- this.lastUpdated = System.currentTimeMillis();
- }
-
- synchronized boolean cancel() {
- if (this.state == DropFlowFileState.COMPLETE || this.state == DropFlowFileState.CANCELED) {
- return false;
- }
-
- this.state = DropFlowFileState.CANCELED;
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java
new file mode 100644
index 0000000..69a0b92
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+public class DropFlowFileRequest implements DropFlowFileStatus {
+ private final String identifier;
+ private final long submissionTime = System.currentTimeMillis();
+
+ private volatile QueueSize originalSize;
+ private volatile QueueSize currentSize;
+ private volatile QueueSize droppedSize = new QueueSize(0, 0L);
+ private volatile long lastUpdated = System.currentTimeMillis();
+ private volatile String failureReason;
+
+ private DropFlowFileState state = DropFlowFileState.WAITING_FOR_LOCK;
+
+
+ public DropFlowFileRequest(final String identifier) {
+ this.identifier = identifier;
+ }
+
+ @Override
+ public String getRequestIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public long getRequestSubmissionTime() {
+ return submissionTime;
+ }
+
+ @Override
+ public QueueSize getOriginalSize() {
+ return originalSize;
+ }
+
+ public void setOriginalSize(final QueueSize originalSize) {
+ this.originalSize = originalSize;
+ }
+
+ @Override
+ public QueueSize getCurrentSize() {
+ return currentSize;
+ }
+
+ public void setCurrentSize(final QueueSize queueSize) {
+ this.currentSize = queueSize;
+ }
+
+ @Override
+ public QueueSize getDroppedSize() {
+ return droppedSize;
+ }
+
+ public void setDroppedSize(final QueueSize droppedSize) {
+ this.droppedSize = droppedSize;
+ }
+
+ @Override
+ public synchronized DropFlowFileState getState() {
+ return state;
+ }
+
+ @Override
+ public long getLastUpdated() {
+ return lastUpdated;
+ }
+
+ @Override
+ public String getFailureReason() {
+ return failureReason;
+ }
+
+ public synchronized void setState(final DropFlowFileState state) {
+ setState(state, null);
+ }
+
+ public synchronized void setState(final DropFlowFileState state, final String explanation) {
+ this.state = state;
+ this.failureReason = explanation;
+ this.lastUpdated = System.currentTimeMillis();
+ }
+
+ public synchronized boolean cancel() {
+ if (this.state == DropFlowFileState.COMPLETE || this.state == DropFlowFileState.CANCELED) {
+ return false;
+ }
+
+ this.state = DropFlowFileState.CANCELED;
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java
index 5aeb5c5..b63be53 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java
@@ -18,6 +18,8 @@ package org.apache.nifi.controller.repository;
import org.apache.nifi.controller.repository.claim.ContentClaim;
+import java.util.Optional;
+
/**
*
*/
@@ -25,23 +27,37 @@ public class ContentNotFoundException extends RuntimeException {
private static final long serialVersionUID = 19048239082L;
private final transient ContentClaim claim;
+ private final transient FlowFileRecord flowFile;
public ContentNotFoundException(final ContentClaim claim) {
super("Could not find content for " + claim);
this.claim = claim;
+ this.flowFile = null;
}
public ContentNotFoundException(final ContentClaim claim, final Throwable t) {
super("Could not find content for " + claim, t);
this.claim = claim;
+ this.flowFile = null;
}
public ContentNotFoundException(final ContentClaim claim, final String message) {
super("Could not find content for " + claim + ": " + message);
this.claim = claim;
+ this.flowFile = null;
+ }
+
+ public ContentNotFoundException(final FlowFileRecord flowFile, final ContentClaim claim, final String message) {
+ super("Could not find content for " + claim + ": " + message);
+ this.claim = claim;
+ this.flowFile = flowFile;
}
public ContentClaim getMissingClaim() {
return claim;
}
+
+ public Optional<FlowFileRecord> getFlowFile() {
+ return Optional.ofNullable(flowFile);
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
index 6172874..fe60585 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
@@ -29,18 +29,16 @@ import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.controller.ProcessScheduler;
-import org.apache.nifi.controller.StandardFlowFileQueue;
+import org.apache.nifi.controller.queue.ConnectionEventListener;
import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueueFactory;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.repository.FlowFileRecord;
-import org.apache.nifi.controller.repository.FlowFileRepository;
-import org.apache.nifi.controller.repository.FlowFileSwapManager;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.events.EventReporter;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.scheduling.SchedulingStrategy;
import java.util.ArrayList;
import java.util.Collection;
@@ -60,7 +58,7 @@ import java.util.stream.Collectors;
* one or more relationships that map the source component to the destination
* component.
*/
-public final class StandardConnection implements Connection {
+public final class StandardConnection implements Connection, ConnectionEventListener {
private final String id;
private final AtomicReference<ProcessGroup> processGroup;
@@ -69,13 +67,16 @@ public final class StandardConnection implements Connection {
private final Connectable source;
private final AtomicReference<Connectable> destination;
private final AtomicReference<Collection<Relationship>> relationships;
- private final StandardFlowFileQueue flowFileQueue;
private final AtomicInteger labelIndex = new AtomicInteger(1);
private final AtomicLong zIndex = new AtomicLong(0L);
private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
private final ProcessScheduler scheduler;
+ private final FlowFileQueueFactory flowFileQueueFactory;
+ private final boolean clustered;
private final int hashCode;
+ private volatile FlowFileQueue flowFileQueue;
+
private StandardConnection(final Builder builder) {
id = builder.id;
name = new AtomicReference<>(builder.name);
@@ -85,9 +86,10 @@ public final class StandardConnection implements Connection {
destination = new AtomicReference<>(builder.destination);
relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships));
scheduler = builder.scheduler;
- flowFileQueue = new StandardFlowFileQueue(id, this, builder.flowFileRepository, builder.provenanceRepository, builder.resourceClaimManager,
- scheduler, builder.swapManager, builder.eventReporter, builder.queueSwapThreshold,
- builder.defaultBackPressureObjectThreshold, builder.defaultBackPressureDataSizeThreshold);
+ flowFileQueueFactory = builder.flowFileQueueFactory;
+ clustered = builder.clustered;
+
+ flowFileQueue = flowFileQueueFactory.createFlowFileQueue(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE, null, this);
hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode();
}
@@ -148,6 +150,20 @@ public final class StandardConnection implements Connection {
}
@Override
+ public void triggerDestinationEvent() {
+ if (getDestination().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
+ scheduler.registerEvent(getDestination());
+ }
+ }
+
+ @Override
+ public void triggerSourceEvent() {
+ if (getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
+ scheduler.registerEvent(getSource());
+ }
+ }
+
+ @Override
public Authorizable getSourceAuthorizable() {
final Connectable sourceConnectable = getSource();
final Authorizable sourceAuthorizable;
@@ -297,7 +313,7 @@ public final class StandardConnection implements Connection {
throw new IllegalStateException("Cannot change destination of Connection because the current destination is running");
}
- if (getFlowFileQueue().getUnacknowledgedQueueSize().getObjectCount() > 0) {
+ if (getFlowFileQueue().isUnacknowledgedFlowFile()) {
throw new IllegalStateException("Cannot change destination of Connection because FlowFiles from this Connection are currently held by " + previousDestination);
}
@@ -354,7 +370,7 @@ public final class StandardConnection implements Connection {
@Override
public String toString() {
- return "Connection[Source ID=" + id + ",Dest ID=" + getDestination().getIdentifier() + "]";
+ return "Connection[ID=" + getIdentifier() + ", Source ID=" + getSource().getIdentifier() + ", Dest ID=" + getDestination().getIdentifier() + "]";
}
/**
@@ -386,14 +402,8 @@ public final class StandardConnection implements Connection {
private Connectable source;
private Connectable destination;
private Collection<Relationship> relationships;
- private FlowFileSwapManager swapManager;
- private EventReporter eventReporter;
- private FlowFileRepository flowFileRepository;
- private ProvenanceEventRepository provenanceRepository;
- private ResourceClaimManager resourceClaimManager;
- private int queueSwapThreshold;
- private Long defaultBackPressureObjectThreshold;
- private String defaultBackPressureDataSizeThreshold;
+ private FlowFileQueueFactory flowFileQueueFactory;
+ private boolean clustered = false;
public Builder(final ProcessScheduler scheduler) {
this.scheduler = scheduler;
@@ -440,43 +450,13 @@ public final class StandardConnection implements Connection {
return this;
}
- public Builder swapManager(final FlowFileSwapManager swapManager) {
- this.swapManager = swapManager;
- return this;
- }
-
- public Builder eventReporter(final EventReporter eventReporter) {
- this.eventReporter = eventReporter;
- return this;
- }
-
- public Builder flowFileRepository(final FlowFileRepository flowFileRepository) {
- this.flowFileRepository = flowFileRepository;
+ public Builder flowFileQueueFactory(final FlowFileQueueFactory flowFileQueueFactory) {
+ this.flowFileQueueFactory = flowFileQueueFactory;
return this;
}
- public Builder provenanceRepository(final ProvenanceEventRepository provenanceRepository) {
- this.provenanceRepository = provenanceRepository;
- return this;
- }
-
- public Builder resourceClaimManager(final ResourceClaimManager resourceClaimManager) {
- this.resourceClaimManager = resourceClaimManager;
- return this;
- }
-
- public Builder queueSwapThreshold(final int queueSwapThreshold) {
- this.queueSwapThreshold = queueSwapThreshold;
- return this;
- }
-
- public Builder defaultBackPressureObjectThreshold(final long defaultBackPressureObjectThreshold) {
- this.defaultBackPressureObjectThreshold = defaultBackPressureObjectThreshold;
- return this;
- }
-
- public Builder defaultBackPressureDataSizeThreshold(final String defaultBackPressureDataSizeThreshold) {
- this.defaultBackPressureDataSizeThreshold = defaultBackPressureDataSizeThreshold;
+ public Builder clustered(final boolean clustered) {
+ this.clustered = clustered;
return this;
}
@@ -487,17 +467,8 @@ public final class StandardConnection implements Connection {
if (destination == null) {
throw new IllegalStateException("Cannot build a Connection without a Destination");
}
- if (swapManager == null) {
- throw new IllegalStateException("Cannot build a Connection without a FlowFileSwapManager");
- }
- if (flowFileRepository == null) {
- throw new IllegalStateException("Cannot build a Connection without a FlowFile Repository");
- }
- if (provenanceRepository == null) {
- throw new IllegalStateException("Cannot build a Connection without a Provenance Repository");
- }
- if (resourceClaimManager == null) {
- throw new IllegalStateException("Cannot build a Connection without a Resource Claim Manager");
+ if (flowFileQueueFactory == null) {
+ throw new IllegalStateException("Cannot build a Connection without a FlowFileQueueFactory");
}
if (relationships == null) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 208bbce..5f8f925 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -16,6 +16,26 @@
*/
package org.apache.nifi.controller;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.SwapContents;
+import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.swap.SchemaSwapDeserializer;
+import org.apache.nifi.controller.swap.SchemaSwapSerializer;
+import org.apache.nifi.controller.swap.SimpleSwapDeserializer;
+import org.apache.nifi.controller.swap.SwapDeserializer;
+import org.apache.nifi.controller.swap.SwapSerializer;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@@ -29,34 +49,19 @@ import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Objects;
+import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
-
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.FlowFileRecord;
-import org.apache.nifi.controller.repository.FlowFileRepository;
-import org.apache.nifi.controller.repository.FlowFileSwapManager;
-import org.apache.nifi.controller.repository.SwapContents;
-import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
-import org.apache.nifi.controller.repository.SwapSummary;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.controller.swap.SchemaSwapDeserializer;
-import org.apache.nifi.controller.swap.SchemaSwapSerializer;
-import org.apache.nifi.controller.swap.SimpleSwapDeserializer;
-import org.apache.nifi.controller.swap.SwapDeserializer;
-import org.apache.nifi.controller.swap.SwapSerializer;
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* <p>
@@ -66,9 +71,8 @@ import org.slf4j.LoggerFactory;
*/
public class FileSystemSwapManager implements FlowFileSwapManager {
- public static final int MINIMUM_SWAP_COUNT = 10000;
- private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
- private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap\\.part");
+ private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+?(\\..*?)?\\.swap");
+ private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+?(\\..*?)?\\.swap\\.part");
public static final int SWAP_ENCODING_VERSION = 10;
public static final String EVENT_CATEGORY = "Swap FlowFiles";
@@ -106,13 +110,18 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
this.flowFileRepository = initializationContext.getFlowFileRepository();
}
+
@Override
- public String swapOut(final List<FlowFileRecord> toSwap, final FlowFileQueue flowFileQueue) throws IOException {
+ public String swapOut(final List<FlowFileRecord> toSwap, final FlowFileQueue flowFileQueue, final String partitionName) throws IOException {
if (toSwap == null || toSwap.isEmpty()) {
return null;
}
- final File swapFile = new File(storageDirectory, System.currentTimeMillis() + "-" + flowFileQueue.getIdentifier() + "-" + UUID.randomUUID().toString() + ".swap");
+ final String swapFilePrefix = System.currentTimeMillis() + "-" + flowFileQueue.getIdentifier() + "-" + UUID.randomUUID().toString();
+ final String swapFileBaseName = partitionName == null ? swapFilePrefix : swapFilePrefix + "." + partitionName;
+ final String swapFileName = swapFileBaseName + ".swap";
+
+ final File swapFile = new File(storageDirectory, swapFileName);
final File swapTempFile = new File(swapFile.getParentFile(), swapFile.getName() + ".part");
final String swapLocation = swapFile.getAbsolutePath();
@@ -185,8 +194,55 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
}
+ private String getOwnerQueueIdentifier(final File swapFile) {
+ final String[] splits = swapFile.getName().split("-");
+ if (splits.length > 6) {
+ final String queueIdentifier = splits[1] + "-" + splits[2] + "-" + splits[3] + "-" + splits[4] + "-" + splits[5];
+ return queueIdentifier;
+ }
+
+ return null;
+ }
+
+ private String getOwnerPartition(final File swapFile) {
+ final String filename = swapFile.getName();
+ final int indexOfDot = filename.indexOf(".");
+ if (indexOfDot < 1) {
+ return null;
+ }
+
+ final int lastIndexOfDot = filename.lastIndexOf(".");
+ if (lastIndexOfDot == indexOfDot) {
+ return null;
+ }
+
+ return filename.substring(indexOfDot + 1, lastIndexOfDot);
+ }
+
+ @Override
+ public Set<String> getSwappedPartitionNames(final FlowFileQueue queue) {
+ final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(final File dir, final String name) {
+ return SWAP_FILE_PATTERN.matcher(name).matches();
+ }
+ });
+
+ if (swapFiles == null) {
+ return Collections.emptySet();
+ }
+
+ final String queueId = queue.getIdentifier();
+
+ return Stream.of(swapFiles)
+ .filter(swapFile -> queueId.equals(getOwnerQueueIdentifier(swapFile)))
+ .map(this::getOwnerPartition)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ }
+
@Override
- public List<String> recoverSwapLocations(final FlowFileQueue flowFileQueue) throws IOException {
+ public List<String> recoverSwapLocations(final FlowFileQueue flowFileQueue, final String partitionName) throws IOException {
final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() {
@Override
public boolean accept(final File dir, final String name) {
@@ -212,15 +268,21 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
// split the filename by dashes. The old filenaming scheme was "<timestamp>-<randomuuid>.swap" but the new naming scheme is
- // "<timestamp>-<queue identifier>-<random uuid>.swap". If we have two dashes, then we can just check if the queue ID is equal
- // to the id of the queue given and if not we can just move on.
- final String[] splits = swapFile.getName().split("-");
- if (splits.length > 6) {
- final String queueIdentifier = splits[1] + "-" + splits[2] + "-" + splits[3] + "-" + splits[4] + "-" + splits[5];
- if (queueIdentifier.equals(flowFileQueue.getIdentifier())) {
- swapLocations.add(swapFile.getAbsolutePath());
+ // "<timestamp>-<queue identifier>-<random uuid>.[partition name.]swap".
+ final String ownerQueueId = getOwnerQueueIdentifier(swapFile);
+ if (ownerQueueId != null) {
+ if (!ownerQueueId.equals(flowFileQueue.getIdentifier())) {
+ continue;
+ }
+
+ if (partitionName != null) {
+ final String ownerPartition = getOwnerPartition(swapFile);
+ if (!partitionName.equals(ownerPartition)) {
+ continue;
+ }
}
+ swapLocations.add(swapFile.getAbsolutePath());
continue;
}
@@ -357,4 +419,28 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
}
}
+
+ @Override
+ public String changePartitionName(final String swapLocation, final String newPartitionName) throws IOException {
+ final File existingFile = new File(swapLocation);
+ if (!existingFile.exists()) {
+ throw new FileNotFoundException("Could not change name of partition for swap location " + swapLocation + " because no swap file exists at that location");
+ }
+
+ final String existingFilename = existingFile.getName();
+
+ final String newFilename;
+ final int dotIndex = existingFilename.indexOf(".");
+ if (dotIndex < 0) {
+ newFilename = existingFilename + "." + newPartitionName + ".swap";
+ } else {
+ newFilename = existingFilename.substring(0, dotIndex) + "." + newPartitionName + ".swap";
+ }
+
+ final File newFile = new File(existingFile.getParentFile(), newFilename);
+ // Use Files.move and convert to Path's instead of File.rename so that we get an IOException on failure that describes why we failed.
+ Files.move(existingFile.toPath(), newFile.toPath());
+
+ return newFile.getAbsolutePath();
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 21c61e9..ebd809c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -35,6 +35,8 @@ import org.apache.nifi.authorization.resource.DataAuthorizable;
import org.apache.nifi.authorization.resource.ProvenanceDataAuthorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.authorization.util.IdentityMapping;
+import org.apache.nifi.authorization.util.IdentityMappingUtil;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
@@ -76,8 +78,23 @@ import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.label.StandardLabel;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener;
+import org.apache.nifi.controller.queue.ConnectionEventListener;
import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueueFactory;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.queue.StandardFlowFileQueue;
+import org.apache.nifi.controller.queue.clustered.ContentRepositoryFlowFileAccess;
+import org.apache.nifi.controller.queue.clustered.SocketLoadBalancedFlowFileQueue;
+import org.apache.nifi.controller.queue.clustered.client.StandardLoadBalanceFlowFileCodec;
+import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientFactory;
+import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientRegistry;
+import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientTask;
+import org.apache.nifi.controller.queue.clustered.server.ClusterLoadBalanceAuthorizer;
+import org.apache.nifi.controller.queue.clustered.server.ConnectionLoadBalanceServer;
+import org.apache.nifi.controller.queue.clustered.server.LoadBalanceAuthorizer;
+import org.apache.nifi.controller.queue.clustered.server.LoadBalanceProtocol;
+import org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.reporting.ReportingTaskProvider;
import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
@@ -243,6 +260,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
@@ -324,6 +342,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final VariableRegistry variableRegistry;
private final ConcurrentMap<String, ControllerServiceNode> rootControllerServices = new ConcurrentHashMap<>();
+ private final ConnectionLoadBalanceServer loadBalanceServer;
+ private final NioAsyncLoadBalanceClientRegistry loadBalanceClientRegistry;
+ private final FlowEngine loadBalanceClientThreadPool;
+ private final Set<NioAsyncLoadBalanceClientTask> loadBalanceClientTasks = new HashSet<>();
+
private final ConcurrentMap<String, ProcessorNode> allProcessors = new ConcurrentHashMap<>();
private final ConcurrentMap<String, ProcessGroup> allProcessGroups = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Connection> allConnections = new ConcurrentHashMap<>();
@@ -673,8 +696,40 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
leaderElectionManager.start();
heartbeatMonitor.start();
+
+ final InetSocketAddress loadBalanceAddress = nifiProperties.getClusterLoadBalanceAddress();
+ // Setup Load Balancing Server
+ final EventReporter eventReporter = createEventReporter(bulletinRepository);
+ final List<IdentityMapping> identityMappings = IdentityMappingUtil.getIdentityMappings(nifiProperties);
+ final LoadBalanceAuthorizer authorizeConnection = new ClusterLoadBalanceAuthorizer(clusterCoordinator, eventReporter);
+ final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepository, provenanceRepository, this, authorizeConnection);
+
+ final int numThreads = nifiProperties.getIntegerProperty(NiFiProperties.LOAD_BALANCE_MAX_THREAD_COUNT, NiFiProperties.DEFAULT_LOAD_BALANCE_MAX_THREAD_COUNT);
+ final String timeoutPeriod = nifiProperties.getProperty(NiFiProperties.LOAD_BALANCE_COMMS_TIMEOUT, NiFiProperties.DEFAULT_LOAD_BALANCE_COMMS_TIMEOUT);
+ final int timeoutMillis = (int) FormatUtils.getTimeDuration(timeoutPeriod, TimeUnit.MILLISECONDS);
+
+ loadBalanceServer = new ConnectionLoadBalanceServer(loadBalanceAddress.getHostName(), loadBalanceAddress.getPort(), sslContext,
+ numThreads, loadBalanceProtocol, eventReporter, timeoutMillis);
+
+
+ final int connectionsPerNode = nifiProperties.getIntegerProperty(NiFiProperties.LOAD_BALANCE_CONNECTIONS_PER_NODE, NiFiProperties.DEFAULT_LOAD_BALANCE_CONNECTIONS_PER_NODE);
+ final NioAsyncLoadBalanceClientFactory asyncClientFactory = new NioAsyncLoadBalanceClientFactory(sslContext, timeoutMillis, new ContentRepositoryFlowFileAccess(contentRepository),
+ eventReporter, new StandardLoadBalanceFlowFileCodec());
+ loadBalanceClientRegistry = new NioAsyncLoadBalanceClientRegistry(asyncClientFactory, connectionsPerNode);
+
+ final int loadBalanceClientThreadCount = nifiProperties.getIntegerProperty(NiFiProperties.LOAD_BALANCE_MAX_THREAD_COUNT, NiFiProperties.DEFAULT_LOAD_BALANCE_MAX_THREAD_COUNT);
+ loadBalanceClientThreadPool = new FlowEngine(loadBalanceClientThreadCount, "Load-Balanced Client", true);
+
+ for (int i=0; i < loadBalanceClientThreadCount; i++) {
+ final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(loadBalanceClientRegistry, clusterCoordinator, eventReporter);
+ loadBalanceClientTasks.add(clientTask);
+ loadBalanceClientThreadPool.submit(clientTask);
+ }
} else {
+ loadBalanceClientRegistry = null;
heartbeater = null;
+ loadBalanceServer = null;
+ loadBalanceClientThreadPool = null;
}
}
@@ -775,6 +830,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
listener.start();
}
+ if (loadBalanceServer != null) {
+ loadBalanceServer.start();
+ }
+
notifyComponentsConfigurationRestored();
timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
@@ -940,11 +999,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
startConnectablesAfterInitialization.clear();
startRemoteGroupPortsAfterInitialization.clear();
}
+
+ for (final Connection connection : getRootGroup().findAllConnections()) {
+ connection.getFlowFileQueue().startLoadBalancing();
+ }
} finally {
writeLock.unlock("onFlowInitialized");
}
}
+ public boolean isStartAfterInitialization(final Connectable component) {
+ return startConnectablesAfterInitialization.contains(component) || startRemoteGroupPortsAfterInitialization.contains(component);
+ }
+
private ContentRepository createContentRepository(final NiFiProperties properties) throws InstantiationException, IllegalAccessException, ClassNotFoundException {
final String implementationClassName = properties.getProperty(NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION, DEFAULT_CONTENT_REPO_IMPLEMENTATION);
if (implementationClassName == null) {
@@ -1040,20 +1107,35 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
swapManager.initialize(initializationContext);
}
- return builder.id(requireNonNull(id).intern())
+ final FlowFileQueueFactory flowFileQueueFactory = new FlowFileQueueFactory() {
+ @Override
+ public FlowFileQueue createFlowFileQueue(final LoadBalanceStrategy loadBalanceStrategy, final String partitioningAttribute, final ConnectionEventListener eventListener) {
+ final FlowFileQueue flowFileQueue;
+
+ if (clusterCoordinator == null) {
+ flowFileQueue = new StandardFlowFileQueue(id, eventListener, flowFileRepository, provenanceRepository, resourceClaimManager, processScheduler, swapManager,
+ eventReporter, nifiProperties.getQueueSwapThreshold(), nifiProperties.getDefaultBackPressureObjectThreshold(), nifiProperties.getDefaultBackPressureDataSizeThreshold());
+ } else {
+ flowFileQueue = new SocketLoadBalancedFlowFileQueue(id, eventListener, processScheduler, flowFileRepository, provenanceRepository, contentRepository, resourceClaimManager,
+ clusterCoordinator, loadBalanceClientRegistry, swapManager, nifiProperties.getQueueSwapThreshold(), eventReporter);
+
+ flowFileQueue.setBackPressureObjectThreshold(nifiProperties.getDefaultBackPressureObjectThreshold());
+ flowFileQueue.setBackPressureDataSizeThreshold(nifiProperties.getDefaultBackPressureDataSizeThreshold());
+ }
+
+ return flowFileQueue;
+ }
+ };
+
+ final Connection connection = builder.id(requireNonNull(id).intern())
.name(name == null ? null : name.intern())
.relationships(relationships)
.source(requireNonNull(source))
.destination(destination)
- .swapManager(swapManager)
- .queueSwapThreshold(nifiProperties.getQueueSwapThreshold())
- .defaultBackPressureObjectThreshold(nifiProperties.getDefaultBackPressureObjectThreshold())
- .defaultBackPressureDataSizeThreshold(nifiProperties.getDefaultBackPressureDataSizeThreshold())
- .eventReporter(eventReporter)
- .resourceClaimManager(resourceClaimManager)
- .flowFileRepository(flowFileRepository)
- .provenanceRepository(provenanceRepository)
+ .flowFileQueueFactory(flowFileQueueFactory)
.build();
+
+ return connection;
}
/**
@@ -1561,6 +1643,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
zooKeeperStateServer.shutdown();
}
+ if (loadBalanceClientThreadPool != null) {
+ loadBalanceClientThreadPool.shutdownNow();
+ }
+ loadBalanceClientTasks.forEach(NioAsyncLoadBalanceClientTask::stop);
+
// Trigger any processors' methods marked with @OnShutdown to be called
getRootGroup().shutdown();
@@ -1606,6 +1693,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
listener.stop();
}
+ if (loadBalanceServer != null) {
+ loadBalanceServer.stop();
+ }
+
+ if (loadBalanceClientRegistry != null) {
+ loadBalanceClientRegistry.stop();
+ }
+
if (processScheduler != null) {
processScheduler.shutdown();
}
@@ -2226,6 +2321,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
queue.setPriorities(newPrioritizers);
}
+ final String loadBalanceStrategyName = connectionDTO.getLoadBalanceStrategy();
+ if (loadBalanceStrategyName != null) {
+ final LoadBalanceStrategy loadBalanceStrategy = LoadBalanceStrategy.valueOf(loadBalanceStrategyName);
+ final String partitioningAttribute = connectionDTO.getLoadBalancePartitionAttribute();
+ queue.setLoadBalanceStrategy(loadBalanceStrategy, partitioningAttribute);
+ }
+
connection.setProcessGroup(group);
group.addConnection(connection);
}
@@ -2737,6 +2839,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public void onConnectionAdded(final Connection connection) {
allConnections.put(connection.getIdentifier(), connection);
+
+ if (isInitialized()) {
+ connection.getFlowFileQueue().startLoadBalancing();
+ }
}
public void onConnectionRemoved(final Connection connection) {
@@ -3494,6 +3600,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
+ public void stopTransmitting(final RemoteGroupPort remoteGroupPort) {
+ writeLock.lock();
+ try {
+ if (initialized.get()) {
+ remoteGroupPort.getRemoteProcessGroup().stopTransmitting(remoteGroupPort);
+ } else {
+ startRemoteGroupPortsAfterInitialization.remove(remoteGroupPort);
+ }
+ } finally {
+ writeLock.unlock("stopTransmitting");
+ }
+ }
+
public void stopProcessor(final String parentGroupId, final String processorId) {
final ProcessGroup group = lookupGroup(parentGroupId);
final ProcessorNode node = group.getProcessor(processorId);
@@ -4344,10 +4463,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
leaderElectionManager.start();
stateManagerProvider.enableClusterProvider();
+ loadBalanceClientRegistry.start();
+
heartbeat();
} else {
stateManagerProvider.disableClusterProvider();
-
setPrimary(false);
}
@@ -4369,6 +4489,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
+
+
/**
* @return true if this instance is the primary node in the cluster; false
* otherwise