You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/07/15 20:14:45 UTC
[2/3] nifi git commit: NIFI-1992: - Updated site-to-site client and
server to support clustered nifi instances NIFI-2274: - Ensuring we use the
correct URI when updating a connection.
http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 1b410d6..d336558 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -766,10 +766,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
return new ConnectionResponse(tryAgainSeconds);
}
- // TODO: Remove the 'null' values here from the ConnectionResponse all together. These
- // will no longer be needed for site-to-site once the NCM is gone.
- return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, null, null, null, instanceId,
- new ArrayList<>(nodeStatuses.values()),
+ return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, instanceId, new ArrayList<>(nodeStatuses.values()),
revisionManager.getAllRevisions().stream().map(rev -> ComponentRevision.fromRevision(rev)).collect(Collectors.toList()));
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
index 6fbb88c..4f86001 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
@@ -17,6 +17,7 @@
package org.apache.nifi.remote.protocol;
import java.io.IOException;
+import java.util.Optional;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.ProcessContext;
@@ -24,6 +25,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.remote.VersionedRemoteResource;
+import org.apache.nifi.remote.cluster.ClusterNodeInformation;
import org.apache.nifi.remote.cluster.NodeInformant;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
@@ -131,9 +133,11 @@ public interface ServerProtocol extends VersionedRemoteResource {
* a cluster, sends info about itself
*
* @param peer peer
+ * @param clusterNodeInfo the cluster information
+ *
* @throws java.io.IOException ioe
*/
- void sendPeerList(Peer peer) throws IOException;
+ void sendPeerList(Peer peer, Optional<ClusterNodeInformation> clusterNodeInfo) throws IOException;
void shutdown(Peer peer);
http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java
new file mode 100644
index 0000000..9f8439c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.remote.cluster.ClusterNodeInformation;
+import org.apache.nifi.remote.cluster.NodeInformant;
+import org.apache.nifi.remote.cluster.NodeInformation;
+
+public class ClusterCoordinatorNodeInformant implements NodeInformant {
+ private final ClusterCoordinator clusterCoordinator;
+
+ public ClusterCoordinatorNodeInformant(final ClusterCoordinator coordinator) {
+ this.clusterCoordinator = coordinator;
+ }
+
+ @Override
+ public ClusterNodeInformation getNodeInformation() {
+ final List<NodeInformation> nodeInfoCollection = new ArrayList<>();
+ final Set<NodeIdentifier> nodeIds = clusterCoordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED);
+
+ // TODO: Get total number of FlowFiles for each node
+ for (final NodeIdentifier nodeId : nodeIds) {
+ final NodeInformation nodeInfo = new NodeInformation(nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(),
+ nodeId.getSiteToSiteHttpApiPort(), nodeId.getApiPort(), nodeId.isSiteToSiteSecure(), 0);
+ nodeInfoCollection.add(nodeInfo);
+ }
+
+ final ClusterNodeInformation nodeInfo = new ClusterNodeInformation();
+ nodeInfo.setNodeInformation(nodeInfoCollection);
+ return nodeInfo;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index fd2f588..ac004e1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -198,6 +198,7 @@ import org.apache.nifi.remote.StandardRemoteProcessGroup;
import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
import org.apache.nifi.remote.StandardRootGroupPort;
import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.cluster.NodeInformant;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol;
import org.apache.nifi.reporting.Bulletin;
@@ -298,9 +299,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final Integer remoteInputSocketPort;
private final Integer remoteInputHttpPort;
private final Boolean isSiteToSiteSecure;
- private Integer clusterManagerRemoteSitePort = null;
- private Integer clusterManagerRemoteSiteHttpPort = null;
- private Boolean clusterManagerRemoteSiteCommsSecure = null;
private ProcessGroup rootGroup;
private final List<Connectable> startConnectablesAfterInitialization;
@@ -411,8 +409,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
clusterCoordinator,
heartbeatMonitor);
- flowController.setClusterManagerRemoteSiteInfo(properties.getRemoteInputPort(), properties.getRemoteInputHttpPort(), properties.isSiteToSiteSecure());
-
return flowController;
}
@@ -525,11 +521,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
} else {
// Register the SocketFlowFileServerProtocol as the appropriate resource for site-to-site Server Protocol
RemoteResourceManager.setServerProtocolImplementation(SocketFlowFileServerProtocol.RESOURCE_NAME, SocketFlowFileServerProtocol.class);
- externalSiteListeners.add(new SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext : null));
+
+ final NodeInformant nodeInformant = configuredForClustering ? new ClusterCoordinatorNodeInformant(clusterCoordinator) : null;
+ externalSiteListeners.add(new SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext : null, nodeInformant));
}
if (remoteInputHttpPort == null) {
- LOG.info("Not enabling HTTP(S) Site-to-Site functionality because nifi.remote.input.html.enabled is not true");
+ LOG.info("Not enabling HTTP(S) Site-to-Site functionality because the '" + NiFiProperties.SITE_TO_SITE_HTTP_ENABLED + "' property is not true");
} else {
externalSiteListeners.add(HttpRemoteSiteListener.getInstance());
}
@@ -3895,45 +3893,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return new ArrayList<>(history.getActions());
}
- public void setClusterManagerRemoteSiteInfo(final Integer managerListeningPort, final Integer managerListeningHttpPort, final Boolean commsSecure) {
- writeLock.lock();
- try {
- clusterManagerRemoteSitePort = managerListeningPort;
- clusterManagerRemoteSiteHttpPort = managerListeningHttpPort;
- clusterManagerRemoteSiteCommsSecure = commsSecure;
- } finally {
- writeLock.unlock();
- }
- }
-
- public Integer getClusterManagerRemoteSiteListeningPort() {
- readLock.lock();
- try {
- return clusterManagerRemoteSitePort;
- } finally {
- readLock.unlock();
- }
- }
-
-
- public Integer getClusterManagerRemoteSiteListeningHttpPort() {
- readLock.lock();
- try {
- return clusterManagerRemoteSiteHttpPort;
- } finally {
- readLock.unlock();
- }
- }
-
- public Boolean isClusterManagerRemoteSiteCommsSecure() {
- readLock.lock();
- try {
- return clusterManagerRemoteSiteCommsSecure;
- } finally {
- readLock.unlock();
- }
- }
-
public Integer getRemoteSiteListeningPort() {
return remoteInputSocketPort;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 801a4e2..49f32c7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -467,7 +467,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
controller.setClustered(true, null);
clusterCoordinator.setConnected(false);
- controller.setClusterManagerRemoteSiteInfo(null, null, null);
controller.setConnectionStatus(new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED));
/*
@@ -586,9 +585,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
// reconnect
final ConnectionResponse connectionResponse = new ConnectionResponse(getNodeId(), request.getDataFlow(),
- request.getManagerRemoteSiteListeningPort(), request.getManagerRemoteSiteListeningHttpPort(),
- request.isManagerRemoteSiteCommsSecure(), request.getInstanceId(),
- request.getNodeConnectionStatuses(), request.getComponentRevisions());
+ request.getInstanceId(), request.getNodeConnectionStatuses(), request.getComponentRevisions());
connectionResponse.setCoordinatorDN(request.getRequestorDN());
loadFromConnectionResponse(connectionResponse);
@@ -853,7 +850,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
// mark the node as clustered
controller.setClustered(true, response.getInstanceId(), response.getCoordinatorDN());
- controller.setClusterManagerRemoteSiteInfo(response.getManagerRemoteInputPort(), response.getManagerRemoteInputHttpPort(), response.isManagerRemoteCommsSecure());
final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId);
final Set<String> roles = status == null ? Collections.emptySet() : status.getRoles();
http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index fb9da32..a5f66ce 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -1138,9 +1138,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
@Override
public void run() {
- try (
- final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient();
- ){
+ try (final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient()){
try {
final ControllerDTO dto = apiClient.getController();
http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
index 7de36c8..fb0ce7c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
@@ -44,6 +44,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
private AuditService auditService;
private StringEncryptor encryptor;
private BulletinRepository bulletinRepository;
+ private ClusterCoordinator clusterCoordinator;
@Override
public Object getObject() throws Exception {
@@ -53,7 +54,6 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
if (properties.isNode()) {
final NodeProtocolSender nodeProtocolSender = applicationContext.getBean("nodeProtocolSender", NodeProtocolSender.class);
final HeartbeatMonitor heartbeatMonitor = applicationContext.getBean("heartbeatMonitor", HeartbeatMonitor.class);
- final ClusterCoordinator clusterCoordinator = applicationContext.getBean("clusterCoordinator", ClusterCoordinator.class);
flowController = FlowController.createClusteredInstance(
flowFileEventRepository,
properties,
@@ -114,4 +114,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
this.bulletinRepository = bulletinRepository;
}
+ public void setClusterCoordinator(final ClusterCoordinator clusterCoordinator) {
+ this.clusterCoordinator = clusterCoordinator;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
index 1004baf..3cd5159 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
@@ -40,6 +40,7 @@
<property name="auditService" ref="auditService" />
<property name="encryptor" ref="stringEncryptor" />
<property name="bulletinRepository" ref="bulletinRepository" />
+ <property name="clusterCoordinator" ref="clusterCoordinator" />
</bean>
<!-- flow service -->
http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
index acf7fc5..08fb188 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
@@ -47,7 +47,6 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
private final Map<String, TransactionWrapper> transactions = new ConcurrentHashMap<>();
private final ScheduledExecutorService taskExecutor;
- private final int httpListenPort;
private ProcessGroup rootGroup;
private ScheduledFuture<?> transactionMaintenanceTask;
@@ -76,9 +75,6 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
SITE_TO_SITE_HTTP_TRANSACTION_TTL, e.getMessage(), txTtlSec);
}
transactionTtlSec = txTtlSec;
-
- httpListenPort = properties.getRemoteInputHttpPort() != null ? properties.getRemoteInputHttpPort() : 0;
-
}
public static HttpRemoteSiteListener getInstance() {
@@ -130,9 +126,7 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
try {
Set<String> transactionIds = transactions.keySet().stream().collect(Collectors.toSet());
transactionIds.stream().filter(tid -> !isTransactionActive(tid))
- .forEach(tid -> {
- cancelTransaction(tid);
- });
+ .forEach(tid -> cancelTransaction(tid));
} catch (Exception e) {
// Swallow exception so that this thread can keep working.
logger.error("An exception occurred while maintaining transactions", e);
@@ -161,10 +155,6 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
}
}
- @Override
- public int getPort() {
- return httpListenPort;
- }
@Override
public void stop() {
@@ -225,7 +215,7 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
return transaction.transaction;
}
- public void extendsTransaction(final String transactionId) throws IllegalStateException {
+ public void extendTransaction(final String transactionId) throws IllegalStateException {
if (!isTransactionActive(transactionId)){
throw new IllegalStateException("Transaction was not found or not active anymore. transactionId=" + transactionId);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java
index 6f7b977..1183fc5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java
@@ -26,8 +26,5 @@ public interface RemoteSiteListener {
void start() throws IOException;
- int getPort();
-
void stop();
-
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index a5d4bbe..814d0e6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -43,6 +43,7 @@ import java.net.SocketTimeoutException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -266,7 +267,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
protocol.getPort().receiveFlowFiles(peer, protocol);
break;
case REQUEST_PEER_LIST:
- protocol.sendPeerList(peer);
+ protocol.sendPeerList(peer, nodeInformant == null ? Optional.empty() : Optional.of(nodeInformant.getNodeInformation()));
break;
case SHUTDOWN:
protocol.shutdown(peer);
@@ -321,8 +322,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
listenerThread.start();
}
- @Override
- public int getPort() {
+ private int getPort() {
return socketPort;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocolImpl.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocolImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocolImpl.java
deleted file mode 100644
index f187625..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocolImpl.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol.http;
-
-import org.apache.nifi.remote.HttpRemoteSiteListener;
-import org.apache.nifi.remote.Peer;
-import org.apache.nifi.remote.Transaction;
-import org.apache.nifi.remote.VersionNegotiator;
-import org.apache.nifi.remote.codec.FlowFileCodec;
-import org.apache.nifi.remote.codec.StandardFlowFileCodec;
-import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession;
-import org.apache.nifi.remote.protocol.AbstractFlowFileServerProtocol;
-import org.apache.nifi.remote.protocol.CommunicationsSession;
-import org.apache.nifi.remote.protocol.FlowFileTransaction;
-import org.apache.nifi.remote.protocol.HandshakenProperties;
-import org.apache.nifi.remote.protocol.RequestType;
-import org.apache.nifi.remote.protocol.Response;
-import org.apache.nifi.remote.protocol.ResponseCode;
-import org.apache.nifi.stream.io.ByteArrayInputStream;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public class HttpFlowFileServerProtocolImpl extends AbstractFlowFileServerProtocol implements HttpFlowFileServerProtocol {
-
- public static final String RESOURCE_NAME = "HttpFlowFileProtocol";
-
- private final FlowFileCodec codec = new StandardFlowFileCodec();
- private final VersionNegotiator versionNegotiator;
- private final HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance();
-
- public HttpFlowFileServerProtocolImpl(VersionNegotiator versionNegotiator) {
- super();
- this.versionNegotiator = versionNegotiator;
- }
-
- @Override
- public FlowFileCodec negotiateCodec(final Peer peer) throws IOException {
- return codec;
- }
-
- @Override
- public FlowFileCodec getPreNegotiatedCodec() {
- return codec;
- }
-
- @Override
- protected HandshakenProperties doHandshake(Peer peer) throws IOException, HandshakeException {
- HandshakenProperties confirmed = new HandshakenProperties();
-
- HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
- confirmed.setCommsIdentifier(commsSession.getTransactionId());
- validateHandshakeRequest(confirmed, peer, commsSession.getHandshakeParams());
-
- logger.debug("{} Done handshake, confirmed={}", this, confirmed);
- return confirmed;
- }
-
- @Override
- protected void writeTransactionResponse(boolean isTransfer, ResponseCode response, CommunicationsSession commsSession, String explanation) throws IOException {
- HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) commsSession;
-
- commSession.setResponseCode(response);
- if(isTransfer){
- switch (response) {
- case NO_MORE_DATA:
- logger.debug("{} There's no data to send.", this);
- break;
- case CONTINUE_TRANSACTION:
- logger.debug("{} Continue transaction... expecting more flow files.", this);
- commSession.setStatus(Transaction.TransactionState.DATA_EXCHANGED);
- break;
- case BAD_CHECKSUM:
- logger.debug("{} Received BAD_CHECKSUM.", this);
- commSession.setStatus(Transaction.TransactionState.ERROR);
- break;
- case CONFIRM_TRANSACTION:
- logger.debug("{} Transaction is confirmed.", this);
- commSession.setStatus(Transaction.TransactionState.TRANSACTION_CONFIRMED);
- break;
- case FINISH_TRANSACTION:
- logger.debug("{} transaction is completed.", this);
- commSession.setStatus(Transaction.TransactionState.TRANSACTION_COMPLETED);
- break;
- }
- } else {
- switch (response) {
- case CONFIRM_TRANSACTION:
- logger.debug("{} Confirming transaction. checksum={}", this, explanation);
- commSession.setChecksum(explanation);
- commSession.setStatus(Transaction.TransactionState.DATA_EXCHANGED);
- break;
- case TRANSACTION_FINISHED:
- case TRANSACTION_FINISHED_BUT_DESTINATION_FULL:
- logger.debug("{} Transaction is completed. responseCode={}", this, response);
- commSession.setStatus(Transaction.TransactionState.TRANSACTION_COMPLETED);
- break;
- }
- }
- }
-
- @Override
- protected Response readTransactionResponse(boolean isTransfer, CommunicationsSession commsSession) throws IOException {
- // Returns Response based on current status.
- HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) commsSession;
-
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- Transaction.TransactionState currentStatus = commSession.getStatus();
- if(isTransfer){
- switch (currentStatus){
- case DATA_EXCHANGED:
- String clientChecksum = commSession.getChecksum();
- logger.debug("readTransactionResponse. clientChecksum={}", clientChecksum);
- ResponseCode.CONFIRM_TRANSACTION.writeResponse(new DataOutputStream(bos), clientChecksum);
- break;
- case TRANSACTION_CONFIRMED:
- logger.debug("readTransactionResponse. finishing.");
- ResponseCode.TRANSACTION_FINISHED.writeResponse(new DataOutputStream(bos));
- break;
- }
- } else {
- switch (currentStatus){
- case TRANSACTION_STARTED:
- logger.debug("readTransactionResponse. returning CONTINUE_TRANSACTION.");
- // We don't know if there's more data to receive, so just continue it.
- ResponseCode.CONTINUE_TRANSACTION.writeResponse(new DataOutputStream(bos));
- break;
- case TRANSACTION_CONFIRMED:
- // Checksum was successfully validated at client side, or BAD_CHECKSUM is returned.
- ResponseCode responseCode = commSession.getResponseCode();
- logger.debug("readTransactionResponse. responseCode={}", responseCode);
- if(responseCode.containsMessage()){
- responseCode.writeResponse(new DataOutputStream(bos), "");
- } else {
- responseCode.writeResponse(new DataOutputStream(bos));
- }
- break;
- }
- }
-
- ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
- return Response.read(new DataInputStream(bis));
- }
-
- private int holdTransaction(Peer peer, FlowFileTransaction transaction) {
- // We don't commit the session here yet,
- // to avoid losing sent flow files in case some issue happens at client side while it is processing,
- // hold the transaction until we confirm additional request from client.
- HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
- String transactionId = commSession.getTransactionId();
- logger.debug("{} Holding transaction. transactionId={}", this, transactionId);
- transactionManager.holdTransaction(transactionId, transaction);
-
- return transaction.getFlowFilesSent().size();
- }
-
- @Override
- protected int commitTransferTransaction(Peer peer, FlowFileTransaction transaction) throws IOException {
- return holdTransaction(peer, transaction);
- }
-
- public int commitTransferTransaction(Peer peer, String clientChecksum) throws IOException, IllegalStateException {
- logger.debug("{} Committing the transfer transaction. peer={} clientChecksum={}", this, peer, clientChecksum);
- HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
- String transactionId = commSession.getTransactionId();
- FlowFileTransaction transaction = transactionManager.finalizeTransaction(transactionId);
- commSession.setChecksum(clientChecksum);
- commSession.setStatus(Transaction.TransactionState.DATA_EXCHANGED);
- return super.commitTransferTransaction(peer, transaction);
- }
-
- @Override
- protected int commitReceiveTransaction(Peer peer, FlowFileTransaction transaction) throws IOException {
- return holdTransaction(peer, transaction);
- }
-
- public int commitReceiveTransaction(Peer peer) throws IOException, IllegalStateException {
- logger.debug("{} Committing the receive transaction. peer={}", this, peer);
- HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
- String transactionId = commSession.getTransactionId();
- FlowFileTransaction transaction = transactionManager.finalizeTransaction(transactionId);
- commSession.setStatus(Transaction.TransactionState.TRANSACTION_CONFIRMED);
- return super.commitReceiveTransaction(peer, transaction);
- }
-
- @Override
- public RequestType getRequestType(final Peer peer) throws IOException {
- return null;
- }
-
- @Override
- public VersionNegotiator getVersionNegotiator() {
- return versionNegotiator;
- }
-
- @Override
- public void sendPeerList(final Peer peer) throws IOException {
- }
-
- @Override
- public String getResourceName() {
- return RESOURCE_NAME;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
new file mode 100644
index 0000000..c4f1f5c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.http;
+
+import org.apache.nifi.remote.HttpRemoteSiteListener;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.cluster.ClusterNodeInformation;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.codec.StandardFlowFileCodec;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession;
+import org.apache.nifi.remote.protocol.AbstractFlowFileServerProtocol;
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.FlowFileTransaction;
+import org.apache.nifi.remote.protocol.HandshakenProperties;
+import org.apache.nifi.remote.protocol.RequestType;
+import org.apache.nifi.remote.protocol.Response;
+import org.apache.nifi.remote.protocol.ResponseCode;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Optional;
+
+public class StandardHttpFlowFileServerProtocol extends AbstractFlowFileServerProtocol implements HttpFlowFileServerProtocol {
+
+ public static final String RESOURCE_NAME = "HttpFlowFileProtocol";
+
+ private final FlowFileCodec codec = new StandardFlowFileCodec();
+ private final VersionNegotiator versionNegotiator;
+ private final HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance();
+
+ public StandardHttpFlowFileServerProtocol(VersionNegotiator versionNegotiator) {
+ super();
+ this.versionNegotiator = versionNegotiator;
+ }
+
+ @Override
+ public FlowFileCodec negotiateCodec(final Peer peer) throws IOException {
+ return codec;
+ }
+
+ @Override
+ public FlowFileCodec getPreNegotiatedCodec() {
+ return codec;
+ }
+
+ @Override
+ protected HandshakenProperties doHandshake(Peer peer) throws IOException, HandshakeException {
+ HandshakenProperties confirmed = new HandshakenProperties();
+
+ HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
+ confirmed.setCommsIdentifier(commsSession.getTransactionId());
+ validateHandshakeRequest(confirmed, peer, commsSession.getHandshakeParams());
+
+ logger.debug("{} Done handshake, confirmed={}", this, confirmed);
+ return confirmed;
+ }
+
+ @Override
+ protected void writeTransactionResponse(boolean isTransfer, ResponseCode response, CommunicationsSession commsSession, String explanation) throws IOException {
+ HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) commsSession;
+
+ commSession.setResponseCode(response);
+ if(isTransfer){
+ switch (response) {
+ case NO_MORE_DATA:
+ logger.debug("{} There's no data to send.", this);
+ break;
+ case CONTINUE_TRANSACTION:
+ logger.debug("{} Continue transaction... expecting more flow files.", this);
+ commSession.setStatus(Transaction.TransactionState.DATA_EXCHANGED);
+ break;
+ case BAD_CHECKSUM:
+ logger.debug("{} Received BAD_CHECKSUM.", this);
+ commSession.setStatus(Transaction.TransactionState.ERROR);
+ break;
+ case CONFIRM_TRANSACTION:
+ logger.debug("{} Transaction is confirmed.", this);
+ commSession.setStatus(Transaction.TransactionState.TRANSACTION_CONFIRMED);
+ break;
+ case FINISH_TRANSACTION:
+ logger.debug("{} transaction is completed.", this);
+ commSession.setStatus(Transaction.TransactionState.TRANSACTION_COMPLETED);
+ break;
+ }
+ } else {
+ switch (response) {
+ case CONFIRM_TRANSACTION:
+ logger.debug("{} Confirming transaction. checksum={}", this, explanation);
+ commSession.setChecksum(explanation);
+ commSession.setStatus(Transaction.TransactionState.DATA_EXCHANGED);
+ break;
+ case TRANSACTION_FINISHED:
+ case TRANSACTION_FINISHED_BUT_DESTINATION_FULL:
+ logger.debug("{} Transaction is completed. responseCode={}", this, response);
+ commSession.setStatus(Transaction.TransactionState.TRANSACTION_COMPLETED);
+ break;
+ }
+ }
+ }
+
+ @Override
+ protected Response readTransactionResponse(boolean isTransfer, CommunicationsSession commsSession) throws IOException {
+ // Returns Response based on current status.
+ HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) commsSession;
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ Transaction.TransactionState currentStatus = commSession.getStatus();
+ if(isTransfer){
+ switch (currentStatus){
+ case DATA_EXCHANGED:
+ String clientChecksum = commSession.getChecksum();
+ logger.debug("readTransactionResponse. clientChecksum={}", clientChecksum);
+ ResponseCode.CONFIRM_TRANSACTION.writeResponse(new DataOutputStream(bos), clientChecksum);
+ break;
+ case TRANSACTION_CONFIRMED:
+ logger.debug("readTransactionResponse. finishing.");
+ ResponseCode.TRANSACTION_FINISHED.writeResponse(new DataOutputStream(bos));
+ break;
+ }
+ } else {
+ switch (currentStatus){
+ case TRANSACTION_STARTED:
+ logger.debug("readTransactionResponse. returning CONTINUE_TRANSACTION.");
+ // We don't know if there's more data to receive, so just continue it.
+ ResponseCode.CONTINUE_TRANSACTION.writeResponse(new DataOutputStream(bos));
+ break;
+ case TRANSACTION_CONFIRMED:
+ // Checksum was successfully validated at client side, or BAD_CHECKSUM is returned.
+ ResponseCode responseCode = commSession.getResponseCode();
+ logger.debug("readTransactionResponse. responseCode={}", responseCode);
+ if(responseCode.containsMessage()){
+ responseCode.writeResponse(new DataOutputStream(bos), "");
+ } else {
+ responseCode.writeResponse(new DataOutputStream(bos));
+ }
+ break;
+ }
+ }
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+ return Response.read(new DataInputStream(bis));
+ }
+
+ private int holdTransaction(Peer peer, FlowFileTransaction transaction) {
+ // We don't commit the session here yet,
+ // to avoid losing sent flow files in case some issue happens at client side while it is processing,
+ // hold the transaction until we confirm additional request from client.
+ HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
+ String transactionId = commSession.getTransactionId();
+ logger.debug("{} Holding transaction. transactionId={}", this, transactionId);
+ transactionManager.holdTransaction(transactionId, transaction);
+
+ return transaction.getFlowFilesSent().size();
+ }
+
+ @Override
+ protected int commitTransferTransaction(Peer peer, FlowFileTransaction transaction) throws IOException {
+ return holdTransaction(peer, transaction);
+ }
+
+ @Override
+ public int commitTransferTransaction(Peer peer, String clientChecksum) throws IOException, IllegalStateException {
+ logger.debug("{} Committing the transfer transaction. peer={} clientChecksum={}", this, peer, clientChecksum);
+ HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
+ String transactionId = commSession.getTransactionId();
+ FlowFileTransaction transaction = transactionManager.finalizeTransaction(transactionId);
+ commSession.setChecksum(clientChecksum);
+ commSession.setStatus(Transaction.TransactionState.DATA_EXCHANGED);
+ return super.commitTransferTransaction(peer, transaction);
+ }
+
+ @Override
+ protected int commitReceiveTransaction(Peer peer, FlowFileTransaction transaction) throws IOException {
+ return holdTransaction(peer, transaction);
+ }
+
+ @Override
+ public int commitReceiveTransaction(Peer peer) throws IOException, IllegalStateException {
+ logger.debug("{} Committing the receive transaction. peer={}", this, peer);
+ HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
+ String transactionId = commSession.getTransactionId();
+ FlowFileTransaction transaction = transactionManager.finalizeTransaction(transactionId);
+ commSession.setStatus(Transaction.TransactionState.TRANSACTION_CONFIRMED);
+ return super.commitReceiveTransaction(peer, transaction);
+ }
+
+ @Override
+ public RequestType getRequestType(final Peer peer) throws IOException {
+ return null;
+ }
+
+ @Override
+ public VersionNegotiator getVersionNegotiator() {
+ return versionNegotiator;
+ }
+
+ @Override
+ public void sendPeerList(final Peer peer, final Optional<ClusterNodeInformation> clusterNodeInformation) throws IOException {
+ }
+
+ @Override
+ public String getResourceName() {
+ return RESOURCE_NAME;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
deleted file mode 100644
index af6860b..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol.socket;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.remote.Peer;
-import org.apache.nifi.remote.RootGroupPort;
-import org.apache.nifi.remote.StandardVersionNegotiator;
-import org.apache.nifi.remote.VersionNegotiator;
-import org.apache.nifi.remote.cluster.ClusterNodeInformation;
-import org.apache.nifi.remote.cluster.NodeInformant;
-import org.apache.nifi.remote.cluster.NodeInformation;
-import org.apache.nifi.remote.codec.FlowFileCodec;
-import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.protocol.CommunicationsSession;
-import org.apache.nifi.remote.protocol.HandshakeProperty;
-import org.apache.nifi.remote.protocol.RequestType;
-import org.apache.nifi.remote.protocol.ResponseCode;
-import org.apache.nifi.remote.protocol.ServerProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ClusterManagerServerProtocol implements ServerProtocol {
-
- public static final String RESOURCE_NAME = "SocketFlowFileProtocol";
-
- private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
- private final Logger logger = LoggerFactory.getLogger(ClusterManagerServerProtocol.class);
- private NodeInformant nodeInformant;
-
- private String commsIdentifier;
- private boolean shutdown = false;
- private boolean handshakeCompleted = false;
- private long requestExpirationMillis = 30000L;
-
- public ClusterManagerServerProtocol() {
- }
-
- @Override
- public void setNodeInformant(final NodeInformant nodeInformant) {
- this.nodeInformant = nodeInformant;
- }
-
- @Override
- public void handshake(final Peer peer) throws IOException, HandshakeException {
- if (handshakeCompleted) {
- throw new IllegalStateException("Handshake has already been completed");
- }
- if (shutdown) {
- throw new IllegalStateException("Protocol is shutdown");
- }
-
- final CommunicationsSession commsSession = peer.getCommunicationsSession();
- final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
- final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
-
- // read communications identifier
- commsIdentifier = dis.readUTF();
-
- // read all of the properties. we don't really care what the properties are.
- final int numProperties = dis.readInt();
- for (int i = 0; i < numProperties; i++) {
- final String propertyName = dis.readUTF();
- final String propertyValue = dis.readUTF();
-
- final HandshakeProperty property;
- try {
- property = HandshakeProperty.valueOf(propertyName);
- if (HandshakeProperty.REQUEST_EXPIRATION_MILLIS.equals(property)) {
- requestExpirationMillis = Long.parseLong(propertyValue);
- }
- } catch (final Exception e) {
- }
- }
-
- // send "OK" response
- ResponseCode.PROPERTIES_OK.writeResponse(dos);
-
- logger.debug("Successfully completed handshake with {}; CommsID={}", peer, commsIdentifier);
- handshakeCompleted = true;
- }
-
- @Override
- public boolean isHandshakeSuccessful() {
- return handshakeCompleted;
- }
-
- @Override
- public void sendPeerList(final Peer peer) throws IOException {
- if (!handshakeCompleted) {
- throw new IllegalStateException("Handshake has not been completed");
- }
- if (shutdown) {
- throw new IllegalStateException("Protocol is shutdown");
- }
-
- final CommunicationsSession commsSession = peer.getCommunicationsSession();
- final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
-
- final ClusterNodeInformation clusterNodeInfo = nodeInformant.getNodeInformation();
- final Collection<NodeInformation> nodeInfos = clusterNodeInfo.getNodeInformation();
-
- // determine how many nodes have Site-to-site enabled
- int numPeers = 0;
- for (final NodeInformation nodeInfo : nodeInfos) {
- if (nodeInfo.getSiteToSitePort() != null) {
- numPeers++;
- }
- }
-
- dos.writeInt(numPeers);
- for (final NodeInformation nodeInfo : nodeInfos) {
- if (nodeInfo.getSiteToSitePort() == null) {
- continue;
- }
-
- dos.writeUTF(nodeInfo.getSiteToSiteHostname());
- dos.writeInt(nodeInfo.getSiteToSitePort());
- dos.writeBoolean(nodeInfo.isSiteToSiteSecure());
- dos.writeInt(nodeInfo.getTotalFlowFiles());
- }
-
- logger.info("Redirected {} to {} nodes", peer, numPeers);
-
- dos.flush();
- }
-
- @Override
- public void shutdown(final Peer peer) {
- shutdown = true;
- }
-
- @Override
- public boolean isShutdown() {
- return shutdown;
- }
-
- @Override
- public FlowFileCodec negotiateCodec(Peer peer) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public FlowFileCodec getPreNegotiatedCodec() {
- return null;
- }
-
- @Override
- public RequestType getRequestType(final Peer peer) throws IOException {
- final CommunicationsSession commsSession = peer.getCommunicationsSession();
- final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
- return RequestType.readRequestType(dis);
- }
-
- @Override
- public int transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public VersionNegotiator getVersionNegotiator() {
- return versionNegotiator;
- }
-
- @Override
- public String getResourceName() {
- return RESOURCE_NAME;
- }
-
- @Override
- public void setRootProcessGroup(final ProcessGroup rootGroup) {
- }
-
- @Override
- public RootGroupPort getPort() {
- return null;
- }
-
- @Override
- public long getRequestExpiration() {
- return requestExpirationMillis;
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index a2a7223..fe7d163 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -20,6 +20,8 @@ import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.RemoteResourceFactory;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.cluster.ClusterNodeInformation;
+import org.apache.nifi.remote.cluster.NodeInformation;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.ProtocolException;
@@ -34,14 +36,19 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol {
public static final String RESOURCE_NAME = "SocketFlowFileProtocol";
- private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1);
+ // Version 6 added to support Zero-Master Clustering, which was introduced in NiFi 1.0.0
+ private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(6, 5, 4, 3, 2, 1);
@Override
protected HandshakenProperties doHandshake(Peer peer) throws IOException, HandshakeException {
@@ -147,7 +154,7 @@ public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol
}
@Override
- public void sendPeerList(final Peer peer) throws IOException {
+ public void sendPeerList(final Peer peer, final Optional<ClusterNodeInformation> clusterNodeInfo) throws IOException {
if (!handshakeCompleted) {
throw new IllegalStateException("Handshake has not been completed");
}
@@ -167,12 +174,36 @@ public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol
}
logger.debug("{} Advertising Remote Input host name {}", this, peer);
- // we have only 1 peer: ourselves.
- dos.writeInt(1);
- dos.writeUTF(remoteInputHost);
- dos.writeInt(properties.getRemoteInputPort());
- dos.writeBoolean(properties.isSiteToSiteSecure());
- dos.writeInt(0); // doesn't matter how many FlowFiles we have, because we're the only host.
+ List<NodeInformation> nodeInfos;
+ if (clusterNodeInfo.isPresent()) {
+ nodeInfos = new ArrayList<>(clusterNodeInfo.get().getNodeInformation());
+ } else {
+ final NodeInformation self = new NodeInformation(remoteInputHost, properties.getRemoteInputPort(), properties.getRemoteInputHttpPort(), properties.getRemoteInputHttpPort(),
+ properties.isSiteToSiteSecure(), 0);
+ nodeInfos = Collections.singletonList(self);
+ }
+
+ // determine how many nodes have Site-to-site enabled
+ int numPeers = 0;
+ for (final NodeInformation nodeInfo : nodeInfos) {
+ if (nodeInfo.getSiteToSitePort() != null) {
+ numPeers++;
+ }
+ }
+
+ dos.writeInt(numPeers);
+ for (final NodeInformation nodeInfo : nodeInfos) {
+ if (nodeInfo.getSiteToSitePort() == null) {
+ continue;
+ }
+
+ dos.writeUTF(nodeInfo.getSiteToSiteHostname());
+ dos.writeInt(nodeInfo.getSiteToSitePort());
+ dos.writeBoolean(nodeInfo.isSiteToSiteSecure());
+ dos.writeInt(nodeInfo.getTotalFlowFiles());
+ }
+
+ logger.info("Sending list of {} peers back to client {}", numPeers, peer);
dos.flush();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/resources/META-INF/services/org.apache.nifi.remote.protocol.ServerProtocol
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/resources/META-INF/services/org.apache.nifi.remote.protocol.ServerProtocol b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/resources/META-INF/services/org.apache.nifi.remote.protocol.ServerProtocol
index fe2182f..67a7a9c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/resources/META-INF/services/org.apache.nifi.remote.protocol.ServerProtocol
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/resources/META-INF/services/org.apache.nifi.remote.protocol.ServerProtocol
@@ -12,5 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol
-org.apache.nifi.remote.protocol.socket.ClusterManagerServerProtocol
\ No newline at end of file
+
+org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
index a8900c9..4519ddd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
@@ -80,7 +80,7 @@ public class TestHttpFlowFileServerProtocol {
final PeerDescription description = new PeerDescription("peer-host", 8080, false);
final InputStream inputStream = new ByteArrayInputStream(new byte[]{});
final OutputStream outputStream = new ByteArrayOutputStream();
- final HttpServerCommunicationsSession commsSession = new HttpServerCommunicationsSession(inputStream, outputStream, transactionId);
+ final HttpServerCommunicationsSession commsSession = new HttpServerCommunicationsSession(inputStream, outputStream, transactionId, "user");
commsSession.putHandshakeParam(HandshakeProperty.GZIP, "false");
commsSession.putHandshakeParam(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, "1234");
final String peerUrl = "http://peer-host:8080/";
@@ -90,7 +90,7 @@ public class TestHttpFlowFileServerProtocol {
private HttpFlowFileServerProtocol getDefaultHttpFlowFileServerProtocol() {
final StandardVersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1);
- return new HttpFlowFileServerProtocolImpl(versionNegotiator);
+ return new StandardHttpFlowFileServerProtocol(versionNegotiator);
}
@Test
@@ -101,7 +101,7 @@ public class TestHttpFlowFileServerProtocol {
try {
serverProtocol.handshake(peer);
fail();
- } catch (HandshakeException e) {
+ } catch (final HandshakeException e) {
assertEquals(ResponseCode.MISSING_PROPERTY, e.getResponseCode());
}
@@ -122,7 +122,7 @@ public class TestHttpFlowFileServerProtocol {
try {
serverProtocol.handshake(peer);
fail();
- } catch (HandshakeException e) {
+ } catch (final HandshakeException e) {
assertEquals(ResponseCode.UNKNOWN_PORT, e.getResponseCode());
}
@@ -147,7 +147,7 @@ public class TestHttpFlowFileServerProtocol {
try {
serverProtocol.handshake(peer);
fail();
- } catch (HandshakeException e) {
+ } catch (final HandshakeException e) {
assertEquals(ResponseCode.UNAUTHORIZED, e.getResponseCode());
}
@@ -173,7 +173,7 @@ public class TestHttpFlowFileServerProtocol {
try {
serverProtocol.handshake(peer);
fail();
- } catch (HandshakeException e) {
+ } catch (final HandshakeException e) {
assertEquals(ResponseCode.PORT_NOT_IN_VALID_STATE, e.getResponseCode());
}
@@ -196,7 +196,7 @@ public class TestHttpFlowFileServerProtocol {
doReturn(true).when(authResult).isAuthorized();
doReturn(true).when(port).isValid();
doReturn(true).when(port).isRunning();
- Set<Connection> connections = new HashSet<>();
+ final Set<Connection> connections = new HashSet<>();
final Connection connection = mock(Connection.class);
connections.add(connection);
doReturn(connections).when(port).getConnections();
@@ -208,7 +208,7 @@ public class TestHttpFlowFileServerProtocol {
try {
serverProtocol.handshake(peer);
fail();
- } catch (HandshakeException e) {
+ } catch (final HandshakeException e) {
assertEquals(ResponseCode.PORTS_DESTINATION_FULL, e.getResponseCode());
}
@@ -237,13 +237,13 @@ public class TestHttpFlowFileServerProtocol {
try {
serverProtocol.transferFlowFiles(peer, context, processSession, negotiatedCoded);
fail("transferFlowFiles should fail since it's already shutdown.");
- } catch (IllegalStateException e) {
+ } catch (final IllegalStateException e) {
}
try {
serverProtocol.receiveFlowFiles(peer, context, processSession, negotiatedCoded);
fail("receiveFlowFiles should fail since it's already shutdown.");
- } catch (IllegalStateException e) {
+ } catch (final IllegalStateException e) {
}
}
@@ -288,12 +288,12 @@ public class TestHttpFlowFileServerProtocol {
try {
serverProtocol.commitTransferTransaction(peer, "client-sent-wrong-checksum");
fail();
- } catch (IOException e) {
+ } catch (final IOException e) {
assertTrue(e.getMessage().contains("CRC32 Checksum"));
}
}
- private Peer transferOneFile(HttpFlowFileServerProtocol serverProtocol, String transactionId) throws IOException {
+ private Peer transferOneFile(final HttpFlowFileServerProtocol serverProtocol, final String transactionId) throws IOException {
final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance();
final Peer peer = getDefaultPeer(transactionId);
final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
@@ -312,21 +312,21 @@ public class TestHttpFlowFileServerProtocol {
doReturn(flowFile).when(processSession).get();
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
doAnswer(invocation -> {
- String peerUrl = (String)invocation.getArguments()[1];
- String detail = (String)invocation.getArguments()[2];
+ final String peerUrl = (String)invocation.getArguments()[1];
+ final String detail = (String)invocation.getArguments()[2];
assertEquals("http://peer-host:8080/", peerUrl);
assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
return null;
}).when(provenanceReporter).send(eq(flowFile), any(String.class), any(String.class), any(Long.class), any(Boolean.class));
doAnswer(invocation -> {
- InputStreamCallback callback = (InputStreamCallback)invocation.getArguments()[1];
+ final InputStreamCallback callback = (InputStreamCallback)invocation.getArguments()[1];
callback.process(new java.io.ByteArrayInputStream("Server content".getBytes()));
return null;
}).when(processSession).read(any(FlowFile.class), any(InputStreamCallback.class));
// Execute test using mock
- int flowFileSent = serverProtocol.transferFlowFiles(peer, context, processSession, negotiatedCoded);
+ final int flowFileSent = serverProtocol.transferFlowFiles(peer, context, processSession, negotiatedCoded);
assertEquals(1, flowFileSent);
assertTrue(remoteSiteListener.isTransactionActive(transactionId));
@@ -360,8 +360,8 @@ public class TestHttpFlowFileServerProtocol {
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
doAnswer(invocation -> {
- String peerUrl = (String)invocation.getArguments()[1];
- String detail = (String)invocation.getArguments()[2];
+ final String peerUrl = (String)invocation.getArguments()[1];
+ final String detail = (String)invocation.getArguments()[2];
assertEquals("http://peer-host:8080/", peerUrl);
assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
return null;
@@ -369,15 +369,15 @@ public class TestHttpFlowFileServerProtocol {
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
doAnswer(invocation -> {
- String peerUrl = (String)invocation.getArguments()[1];
- String detail = (String)invocation.getArguments()[2];
+ final String peerUrl = (String)invocation.getArguments()[1];
+ final String detail = (String)invocation.getArguments()[2];
assertEquals("http://peer-host:8080/", peerUrl);
assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
return null;
}).when(provenanceReporter).send(eq(flowFile2), any(String.class), any(String.class), any(Long.class), any(Boolean.class));
doAnswer(invocation -> {
- InputStreamCallback callback = (InputStreamCallback)invocation.getArguments()[1];
+ final InputStreamCallback callback = (InputStreamCallback)invocation.getArguments()[1];
callback.process(new java.io.ByteArrayInputStream("Server content".getBytes()));
return null;
}).when(processSession).read(any(FlowFile.class), any(InputStreamCallback.class));
@@ -397,7 +397,7 @@ public class TestHttpFlowFileServerProtocol {
final String contents = "Content from client.";
final byte[] bytes = contents.getBytes();
final InputStream in = new ByteArrayInputStream(bytes);
- Map<String, String> attributes = new HashMap<>();
+ final Map<String, String> attributes = new HashMap<>();
attributes.put("client-attr-1", "client-attr-1-value");
attributes.put("client-attr-2", "client-attr-2-value");
return new StandardDataPacket(attributes, in, bytes.length);
@@ -458,12 +458,12 @@ public class TestHttpFlowFileServerProtocol {
try {
serverProtocol.commitReceiveTransaction(peer);
fail();
- } catch (IOException e) {
+ } catch (final IOException e) {
assertTrue(e.getMessage().contains("Received a BadChecksum response"));
}
}
- private void receiveOneFile(HttpFlowFileServerProtocol serverProtocol, String transactionId, Peer peer) throws IOException {
+ private void receiveOneFile(final HttpFlowFileServerProtocol serverProtocol, final String transactionId, final Peer peer) throws IOException {
final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance();
final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1");
@@ -479,7 +479,7 @@ public class TestHttpFlowFileServerProtocol {
final ProvenanceReporter provenanceReporter = mock(ProvenanceReporter.class);
final FlowFile flowFile = mock(FlowFile.class);
- DataPacket dataPacket = createClientDataPacket();
+ final DataPacket dataPacket = createClientDataPacket();
final ByteArrayOutputStream testDataOs = new ByteArrayOutputStream();
negotiatedCoded.encode(dataPacket, testDataOs);
@@ -488,7 +488,7 @@ public class TestHttpFlowFileServerProtocol {
((HttpInput)commsSession.getInput()).setInputStream(httpInputStream);
doAnswer(invocation -> {
- InputStream is = (InputStream) invocation.getArguments()[0];
+ final InputStream is = (InputStream) invocation.getArguments()[0];
for (int b; (b = is.read()) >= 0;) {
// consume stream.
}
@@ -499,21 +499,21 @@ public class TestHttpFlowFileServerProtocol {
doReturn(flowFile).when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class));
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
doAnswer(invocation -> {
- String peerUrl = (String)invocation.getArguments()[1];
- String detail = (String)invocation.getArguments()[3];
+ final String peerUrl = (String)invocation.getArguments()[1];
+ final String detail = (String)invocation.getArguments()[3];
assertEquals("http://peer-host:8080/", peerUrl);
assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
return null;
}).when(provenanceReporter)
.receive(any(FlowFile.class), any(String.class), any(String.class), any(String.class), any(Long.class));
- Set<Relationship> relations = new HashSet<>();
+ final Set<Relationship> relations = new HashSet<>();
final Relationship relationship = new Relationship.Builder().build();
relations.add(relationship);
doReturn(relations).when(context).getAvailableRelationships();
// Execute test using mock
- int flowFileReceived = serverProtocol.receiveFlowFiles(peer, context, processSession, negotiatedCoded);
+ final int flowFileReceived = serverProtocol.receiveFlowFiles(peer, context, processSession, negotiatedCoded);
assertEquals(1, flowFileReceived);
assertTrue(remoteSiteListener.isTransactionActive(transactionId));
@@ -549,7 +549,7 @@ public class TestHttpFlowFileServerProtocol {
((HttpInput)commsSession.getInput()).setInputStream(httpInputStream);
doAnswer(invocation -> {
- InputStream is = (InputStream) invocation.getArguments()[0];
+ final InputStream is = (InputStream) invocation.getArguments()[0];
for (int b; (b = is.read()) >= 0;) {
// consume stream.
}
@@ -562,15 +562,15 @@ public class TestHttpFlowFileServerProtocol {
.when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class));
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
doAnswer(invocation -> {
- String peerUrl = (String)invocation.getArguments()[1];
- String detail = (String)invocation.getArguments()[3];
+ final String peerUrl = (String)invocation.getArguments()[1];
+ final String detail = (String)invocation.getArguments()[3];
assertEquals("http://peer-host:8080/", peerUrl);
assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
return null;
}).when(provenanceReporter)
.receive(any(FlowFile.class), any(String.class), any(String.class), any(String.class), any(Long.class));
- Set<Relationship> relations = new HashSet<>();
+ final Set<Relationship> relations = new HashSet<>();
doReturn(relations).when(context).getAvailableRelationships();
// Execute test using mock
http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 31087c9..892718e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -16,7 +16,28 @@
*/
package org.apache.nifi.web;
-import com.google.common.collect.Sets;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
@@ -194,26 +215,7 @@ import org.apache.nifi.web.util.SnippetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
+import com.google.common.collect.Sets;
/**
* Implementation of NiFiServiceFacade that performs revision checking.
@@ -2157,15 +2159,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
controllerDTO.setDisabledCount(counts.getDisabledCount());
// determine the site to site configuration
- if (isClustered()) {
- controllerDTO.setRemoteSiteListeningPort(controllerFacade.getClusterManagerRemoteSiteListeningPort());
- controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getClusterManagerRemoteSiteListeningHttpPort());
- controllerDTO.setSiteToSiteSecure(controllerFacade.isClusterManagerRemoteSiteCommsSecure());
- } else {
- controllerDTO.setRemoteSiteListeningPort(controllerFacade.getRemoteSiteListeningPort());
- controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getRemoteSiteListeningHttpPort());
- controllerDTO.setSiteToSiteSecure(controllerFacade.isRemoteSiteCommsSecure());
- }
+ controllerDTO.setRemoteSiteListeningPort(controllerFacade.getRemoteSiteListeningPort());
+ controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getRemoteSiteListeningHttpPort());
+ controllerDTO.setSiteToSiteSecure(controllerFacade.isRemoteSiteCommsSecure());
return controllerDTO;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
index aad8b4a..e77d769 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
@@ -49,7 +49,7 @@ import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession;
import org.apache.nifi.remote.protocol.HandshakeProperty;
import org.apache.nifi.remote.protocol.ResponseCode;
import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol;
-import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocolImpl;
+import org.apache.nifi.remote.protocol.http.StandardHttpFlowFileServerProtocol;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.web.api.entity.TransactionResultEntity;
import org.slf4j.Logger;
@@ -305,16 +305,18 @@ public class DataTransferResource extends ApplicationResource {
}
HttpFlowFileServerProtocol getHttpFlowFileServerProtocol(VersionNegotiator versionNegotiator) {
- return new HttpFlowFileServerProtocolImpl(versionNegotiator);
+ return new StandardHttpFlowFileServerProtocol(versionNegotiator);
}
private Peer constructPeer(HttpServletRequest req, InputStream inputStream, OutputStream outputStream, String portId, String transactionId) {
- String clientHostName = req.getRemoteHost();
- int clientPort = req.getRemotePort();
+ final String clientHostName = req.getRemoteHost();
+ final int clientPort = req.getRemotePort();
- PeerDescription peerDescription = new PeerDescription(clientHostName, clientPort, req.isSecure());
+ final PeerDescription peerDescription = new PeerDescription(clientHostName, clientPort, req.isSecure());
- HttpServerCommunicationsSession commSession = new HttpServerCommunicationsSession(inputStream, outputStream, transactionId);
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ final String userDn = user == null ? null : user.getIdentity();
+ final HttpServerCommunicationsSession commSession = new HttpServerCommunicationsSession(inputStream, outputStream, transactionId, userDn);
boolean useCompression = false;
final String useCompressionStr = req.getHeader(HANDSHAKE_PROPERTY_USE_COMPRESSION);
@@ -330,20 +332,28 @@ public class DataTransferResource extends ApplicationResource {
commSession.putHandshakeParam(HandshakeProperty.PORT_IDENTIFIER, portId);
commSession.putHandshakeParam(HandshakeProperty.GZIP, String.valueOf(useCompression));
- if (!isEmpty(requestExpiration)) commSession.putHandshakeParam(REQUEST_EXPIRATION_MILLIS, requestExpiration);
- if (!isEmpty(batchCount)) commSession.putHandshakeParam(BATCH_COUNT, batchCount);
- if (!isEmpty(batchSize)) commSession.putHandshakeParam(BATCH_SIZE, batchSize);
- if (!isEmpty(batchDuration)) commSession.putHandshakeParam(BATCH_DURATION, batchDuration);
+ if (!isEmpty(requestExpiration)) {
+ commSession.putHandshakeParam(REQUEST_EXPIRATION_MILLIS, requestExpiration);
+ }
+ if (!isEmpty(batchCount)) {
+ commSession.putHandshakeParam(BATCH_COUNT, batchCount);
+ }
+ if (!isEmpty(batchSize)) {
+ commSession.putHandshakeParam(BATCH_SIZE, batchSize);
+ }
+ if (!isEmpty(batchDuration)) {
+ commSession.putHandshakeParam(BATCH_DURATION, batchDuration);
+ }
if(peerDescription.isSecure()){
- NiFiUser nifiUser = NiFiUserUtils.getNiFiUser();
+ final NiFiUser nifiUser = NiFiUserUtils.getNiFiUser();
logger.debug("initiating peer, nifiUser={}", nifiUser);
commSession.setUserDn(nifiUser.getIdentity());
}
// TODO: Followed how SocketRemoteSiteListener define peerUrl and clusterUrl, but it can be more meaningful values, especially for clusterUrl.
- String peerUrl = "nifi://" + clientHostName + ":" + clientPort;
- String clusterUrl = "nifi://localhost:" + req.getLocalPort();
+ final String peerUrl = "nifi://" + clientHostName + ":" + clientPort;
+ final String clusterUrl = "nifi://localhost:" + req.getLocalPort();
return new Peer(peerDescription, commSession, peerUrl, clusterUrl);
}
@@ -771,7 +781,7 @@ public class DataTransferResource extends ApplicationResource {
try {
// Do handshake
initiateServerProtocol(peer, transportProtocolVersion);
- transactionManager.extendsTransaction(transactionId);
+ transactionManager.extendTransaction(transactionId);
final TransactionResultEntity entity = new TransactionResultEntity();
entity.setResponseCode(ResponseCode.CONTINUE_TRANSACTION.getCode());