You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/23 20:57:11 UTC
[18/29] incubator-nifi git commit: NIFI-282: Fixed bug that caused
load distribution across nodes in cluster not to work
NIFI-282: Fixed bug that caused load distribution across nodes in cluster not to work
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a7405b91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a7405b91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a7405b91
Branch: refs/heads/develop
Commit: a7405b915d059483135d706a7ce6a7b848b4c175
Parents: 2f60ddc
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Feb 16 16:49:15 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Feb 16 16:49:15 2015 -0500
----------------------------------------------------------------------
.../main/java/org/apache/nifi/remote/Peer.java | 8 +-
.../org/apache/nifi/remote/PeerDescription.java | 79 ++++++++++++
.../java/org/apache/nifi/remote/PeerStatus.java | 32 ++---
.../client/socket/EndpointConnectionPool.java | 127 ++++++++++---------
.../protocol/socket/SocketClientProtocol.java | 5 +-
.../socket/TestEndpointConnectionStatePool.java | 8 +-
.../client/socket/TestSiteToSiteClient.java | 41 +++---
.../nifi/remote/SocketRemoteSiteListener.java | 3 +-
8 files changed, 198 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a7405b91/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
index 3534f95..2428078 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
@@ -25,6 +25,7 @@ import org.apache.nifi.remote.protocol.CommunicationsSession;
public class Peer implements Communicant {
+ private final PeerDescription description;
private final CommunicationsSession commsSession;
private final String url;
private final String clusterUrl;
@@ -34,7 +35,8 @@ public class Peer implements Communicant {
private final Map<String, Long> penaltyExpirationMap = new HashMap<>();
private boolean closed = false;
- public Peer(final CommunicationsSession commsSession, final String peerUrl, final String clusterUrl) {
+ public Peer(final PeerDescription description, final CommunicationsSession commsSession, final String peerUrl, final String clusterUrl) {
+ this.description = description;
this.commsSession = commsSession;
this.url = peerUrl;
this.clusterUrl = clusterUrl;
@@ -48,6 +50,10 @@ public class Peer implements Communicant {
}
}
+ public PeerDescription getDescription() {
+ return description;
+ }
+
@Override
public String getUrl() {
return url;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a7405b91/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java
new file mode 100644
index 0000000..0e8e498
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+public class PeerDescription {
+ private final String hostname;
+ private final int port;
+ private final boolean secure;
+
+ public PeerDescription(final String hostname, final int port, final boolean secure) {
+ this.hostname = hostname;
+ this.port = port;
+ this.secure = secure;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public boolean isSecure() {
+ return secure;
+ }
+
+ @Override
+ public String toString() {
+ return "PeerDescription[hostname=" + hostname + ", port=" + port + ", secure=" + secure + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((hostname == null) ? 0 : hostname.hashCode());
+ result = prime * result + port;
+ return result;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
+ final PeerDescription other = (PeerDescription) obj;
+ if (hostname == null) {
+ if (other.hostname != null) {
+ return false;
+ }
+ } else if (!hostname.equals(other.hostname)) {
+ return false;
+ }
+
+ return port == other.port;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a7405b91/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
index d1cb076..b68ac33 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
@@ -17,43 +17,31 @@
package org.apache.nifi.remote;
public class PeerStatus {
-
- private final String hostname;
- private final int port;
- private final boolean secure;
+ private final PeerDescription description;
private final int numFlowFiles;
- public PeerStatus(final String hostname, final int port, final boolean secure, final int numFlowFiles) {
- this.hostname = hostname;
- this.port = port;
- this.secure = secure;
+ public PeerStatus(final PeerDescription description, final int numFlowFiles) {
+ this.description = description;
this.numFlowFiles = numFlowFiles;
}
- public String getHostname() {
- return hostname;
- }
-
- public int getPort() {
- return port;
+ public PeerDescription getPeerDescription() {
+ return description;
}
-
- public boolean isSecure() {
- return secure;
- }
-
+
public int getFlowFileCount() {
return numFlowFiles;
}
@Override
public String toString() {
- return "PeerStatus[hostname=" + hostname + ",port=" + port + ",secure=" + secure + ",flowFileCount=" + numFlowFiles + "]";
+ return "PeerStatus[hostname=" + description.getHostname() + ",port=" + description.getPort() +
+ ",secure=" + description.isSecure() + ",flowFileCount=" + numFlowFiles + "]";
}
@Override
public int hashCode() {
- return 9824372 + hostname.hashCode() + port;
+ return 9824372 + description.getHostname().hashCode() + description.getPort() * 41;
}
@Override
@@ -67,6 +55,6 @@ public class PeerStatus {
}
final PeerStatus other = (PeerStatus) obj;
- return port == other.port && hostname.equals(other.hostname);
+ return description.equals(other.getPeerDescription());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a7405b91/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index f9a8a38..450daec 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -61,6 +61,7 @@ import javax.security.cert.CertificateNotYetValidException;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteDestination;
import org.apache.nifi.remote.RemoteResourceInitiator;
@@ -97,8 +98,8 @@ public class EndpointConnectionPool {
private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionPool.class);
- private final BlockingQueue<EndpointConnection> connectionQueue = new LinkedBlockingQueue<>();
- private final ConcurrentMap<PeerStatus, Long> peerTimeoutExpirations = new ConcurrentHashMap<>();
+ private final ConcurrentMap<PeerDescription, BlockingQueue<EndpointConnection>> connectionQueueMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap<PeerDescription, Long> peerTimeoutExpirations = new ConcurrentHashMap<>();
private final URI clusterUrl;
private final String apiUri;
@@ -227,6 +228,23 @@ public class EndpointConnectionPool {
SocketClientProtocol protocol = null;
EndpointConnection connection;
Peer peer = null;
+
+ logger.debug("{} getting next peer status", this);
+ final PeerStatus peerStatus = getNextPeerStatus(direction);
+ logger.debug("{} next peer status = {}", this, peerStatus);
+ if ( peerStatus == null ) {
+ return null;
+ }
+
+ final PeerDescription peerDescription = peerStatus.getPeerDescription();
+ BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peerStatus);
+ if ( connectionQueue == null ) {
+ connectionQueue = new LinkedBlockingQueue<>();
+ BlockingQueue<EndpointConnection> existing = connectionQueueMap.putIfAbsent(peerDescription, connectionQueue);
+ if ( existing != null ) {
+ connectionQueue = existing;
+ }
+ }
final List<EndpointConnection> addBack = new ArrayList<>();
try {
@@ -254,19 +272,12 @@ public class EndpointConnectionPool {
protocol = new SocketClientProtocol();
protocol.setDestination(new IdEnrichedRemoteDestination(remoteDestination, portId));
- logger.debug("{} getting next peer status", this);
- final PeerStatus peerStatus = getNextPeerStatus(direction);
- logger.debug("{} next peer status = {}", this, peerStatus);
- if ( peerStatus == null ) {
- return null;
- }
-
final long penalizationMillis = remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS);
try {
logger.debug("{} Establishing site-to-site connection with {}", this, peerStatus);
commsSession = establishSiteToSiteConnection(peerStatus);
} catch (final IOException ioe) {
- penalize(peerStatus, penalizationMillis);
+ penalize(peerStatus.getPeerDescription(), penalizationMillis);
throw ioe;
}
@@ -283,8 +294,8 @@ public class EndpointConnectionPool {
}
}
- final String peerUrl = "nifi://" + peerStatus.getHostname() + ":" + peerStatus.getPort();
- peer = new Peer(commsSession, peerUrl, clusterUrl.toString());
+ final String peerUrl = "nifi://" + peerDescription.getHostname() + ":" + peerDescription.getPort();
+ peer = new Peer(peerDescription, commsSession, peerUrl, clusterUrl.toString());
// set properties based on config
if ( config != null ) {
@@ -371,6 +382,11 @@ public class EndpointConnectionPool {
return false;
}
+ final BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peer.getDescription());
+ if ( connectionQueue == null ) {
+ return false;
+ }
+
activeConnections.remove(endpointConnection);
if ( shutdown ) {
terminate(endpointConnection);
@@ -381,14 +397,14 @@ public class EndpointConnectionPool {
}
}
- private void penalize(final PeerStatus status, final long penalizationMillis) {
- Long expiration = peerTimeoutExpirations.get(status);
+ private void penalize(final PeerDescription peerDescription, final long penalizationMillis) {
+ Long expiration = peerTimeoutExpirations.get(peerDescription);
if ( expiration == null ) {
expiration = Long.valueOf(0L);
}
final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis);
- peerTimeoutExpirations.put(status, Long.valueOf(newExpiration));
+ peerTimeoutExpirations.put(peerDescription, Long.valueOf(newExpiration));
}
/**
@@ -396,19 +412,7 @@ public class EndpointConnectionPool {
* @param peer
*/
public void penalize(final Peer peer, final long penalizationMillis) {
- String host;
- int port;
- try {
- final URI uri = new URI(peer.getUrl());
- host = uri.getHost();
- port = uri.getPort();
- } catch (final URISyntaxException e) {
- host = peer.getHost();
- port = -1;
- }
-
- final PeerStatus status = new PeerStatus(host, port, true, 1);
- penalize(status, penalizationMillis);
+ penalize(peer.getDescription(), penalizationMillis);
}
private void cleanup(final SocketClientProtocol protocol, final Peer peer) {
@@ -509,7 +513,8 @@ public class EndpointConnectionPool {
final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
final List<NodeInformation> nodeInfos = new ArrayList<>();
for ( final PeerStatus peerStatus : statuses ) {
- final NodeInformation nodeInfo = new NodeInformation(peerStatus.getHostname(), peerStatus.getPort(), 0, peerStatus.isSecure(), peerStatus.getFlowFileCount());
+ final PeerDescription description = peerStatus.getPeerDescription();
+ final NodeInformation nodeInfo = new NodeInformation(description.getHostname(), description.getPort(), 0, description.isSecure(), peerStatus.getFlowFileCount());
nodeInfos.add(nodeInfo);
}
clusterNodeInfo.setNodeInformation(nodeInfos);
@@ -526,7 +531,7 @@ public class EndpointConnectionPool {
if (cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis()) {
final Set<PeerStatus> equalizedSet = new HashSet<>(cache.getStatuses().size());
for (final PeerStatus status : cache.getStatuses()) {
- final PeerStatus equalizedStatus = new PeerStatus(status.getHostname(), status.getPort(), status.isSecure(), 1);
+ final PeerStatus equalizedStatus = new PeerStatus(status.getPeerDescription(), 1);
equalizedSet.add(equalizedStatus);
}
@@ -543,8 +548,9 @@ public class EndpointConnectionPool {
throw new IOException("Remote instance of NiFi is not configured to allow site-to-site communications");
}
+ final PeerDescription clusterPeerDescription = new PeerDescription(hostname, port, clusterUrl.toString().startsWith("https://"));
final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port);
- final Peer peer = new Peer(commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString());
+ final Peer peer = new Peer(clusterPeerDescription, commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString());
final SocketClientProtocol clientProtocol = new SocketClientProtocol();
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
@@ -602,7 +608,8 @@ public class EndpointConnectionPool {
final OutputStream out = new BufferedOutputStream(fos)) {
for (final PeerStatus status : statuses) {
- final String line = status.getHostname() + ":" + status.getPort() + ":" + status.isSecure() + "\n";
+ final PeerDescription description = status.getPeerDescription();
+ final String line = description.getHostname() + ":" + description.getPort() + ":" + description.isSecure() + "\n";
out.write(line.getBytes(StandardCharsets.UTF_8));
}
@@ -631,7 +638,7 @@ public class EndpointConnectionPool {
final int port = Integer.parseInt(splits[1]);
final boolean secure = Boolean.parseBoolean(splits[2]);
- statuses.add(new PeerStatus(hostname, port, secure, 1));
+ statuses.add(new PeerStatus(new PeerDescription(hostname, port, secure), 1));
}
}
@@ -640,7 +647,8 @@ public class EndpointConnectionPool {
private CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException {
- return establishSiteToSiteConnection(peerStatus.getHostname(), peerStatus.getPort());
+ final PeerDescription description = peerStatus.getPeerDescription();
+ return establishSiteToSiteConnection(description.getHostname(), description.getPort());
}
private CommunicationsSession establishSiteToSiteConnection(final String hostname, final int port) throws IOException {
@@ -720,7 +728,8 @@ public class EndpointConnectionPool {
final int index = n % destinations.size();
PeerStatus status = destinations.get(index);
if ( status == null ) {
- status = new PeerStatus(nodeInfo.getHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure(), nodeInfo.getTotalFlowFiles());
+ final PeerDescription description = new PeerDescription(nodeInfo.getHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure());
+ status = new PeerStatus(description, nodeInfo.getTotalFlowFiles());
destinations.set(index, status);
break;
} else {
@@ -744,27 +753,29 @@ public class EndpointConnectionPool {
private void cleanupExpiredSockets() {
- final List<EndpointConnection> connections = new ArrayList<>();
-
- EndpointConnection connection;
- while ((connection = connectionQueue.poll()) != null) {
- // If the socket has not been used in 10 seconds, shut it down.
- final long lastUsed = connection.getLastTimeUsed();
- if ( lastUsed < System.currentTimeMillis() - idleExpirationMillis ) {
- try {
- connection.getSocketClientProtocol().shutdown(connection.getPeer());
- } catch (final Exception e) {
- logger.debug("Failed to shut down {} using {} due to {}",
- new Object[] {connection.getSocketClientProtocol(), connection.getPeer(), e} );
+ for ( final BlockingQueue<EndpointConnection> connectionQueue : connectionQueueMap.values()) {
+ final List<EndpointConnection> connections = new ArrayList<>();
+
+ EndpointConnection connection;
+ while ((connection = connectionQueue.poll()) != null) {
+ // If the socket has not been used in 10 seconds, shut it down.
+ final long lastUsed = connection.getLastTimeUsed();
+ if ( lastUsed < System.currentTimeMillis() - idleExpirationMillis ) {
+ try {
+ connection.getSocketClientProtocol().shutdown(connection.getPeer());
+ } catch (final Exception e) {
+ logger.debug("Failed to shut down {} using {} due to {}",
+ new Object[] {connection.getSocketClientProtocol(), connection.getPeer(), e} );
+ }
+
+ terminate(connection);
+ } else {
+ connections.add(connection);
}
-
- terminate(connection);
- } else {
- connections.add(connection);
}
+
+ connectionQueue.addAll(connections);
}
-
- connectionQueue.addAll(connections);
}
public void shutdown() {
@@ -775,10 +786,12 @@ public class EndpointConnectionPool {
for ( final EndpointConnection conn : activeConnections ) {
conn.getPeer().getCommunicationsSession().interrupt();
}
-
- EndpointConnection state;
- while ( (state = connectionQueue.poll()) != null) {
- cleanup(state.getSocketClientProtocol(), state.getPeer());
+
+ for ( final BlockingQueue<EndpointConnection> connectionQueue : connectionQueueMap.values() ) {
+ EndpointConnection state;
+ while ( (state = connectionQueue.poll()) != null) {
+ cleanup(state.getSocketClientProtocol(), state.getPeer());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a7405b91/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index 390f4fc..c3275ea 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -34,6 +34,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteDestination;
import org.apache.nifi.remote.RemoteResourceInitiator;
@@ -117,7 +118,7 @@ public class SocketClientProtocol implements ClientProtocol {
properties.put(HandshakeProperty.GZIP, String.valueOf(useCompression));
if ( destinationId != null ) {
- properties.put(HandshakeProperty.PORT_IDENTIFIER, destination.getIdentifier());
+ properties.put(HandshakeProperty.PORT_IDENTIFIER, destinationId);
}
properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(timeoutMillis) );
@@ -229,7 +230,7 @@ public class SocketClientProtocol implements ClientProtocol {
final int port = dis.readInt();
final boolean secure = dis.readBoolean();
final int flowFileCount = dis.readInt();
- peers.add(new PeerStatus(hostname, port, secure, flowFileCount));
+ peers.add(new PeerStatus(new PeerDescription(hostname, port, secure), flowFileCount));
}
logger.debug("{} Received {} Peer Statuses from {}", this, peers.size(), peer);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a7405b91/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
index 275e40c..cb7af08 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
@@ -41,7 +41,7 @@ public class TestEndpointConnectionStatePool {
clusterNodeInfo.setNodeInformation(collection);
final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
for ( final PeerStatus peerStatus : destinations ) {
- System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+ System.out.println(peerStatus.getPeerDescription());
}
}
@@ -55,7 +55,7 @@ public class TestEndpointConnectionStatePool {
clusterNodeInfo.setNodeInformation(collection);
final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
for ( final PeerStatus peerStatus : destinations ) {
- System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+ System.out.println(peerStatus.getPeerDescription());
}
}
@@ -75,7 +75,7 @@ public class TestEndpointConnectionStatePool {
clusterNodeInfo.setNodeInformation(collection);
final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
for ( final PeerStatus peerStatus : destinations ) {
- System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+ System.out.println(peerStatus.getPeerDescription());
}
}
@@ -89,7 +89,7 @@ public class TestEndpointConnectionStatePool {
clusterNodeInfo.setNodeInformation(collection);
final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
for ( final PeerStatus peerStatus : destinations ) {
- System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+ System.out.println(peerStatus.getPeerDescription());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a7405b91/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
index bb16a34..0f48b03 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
@@ -43,26 +43,31 @@ public class TestSiteToSiteClient {
final SiteToSiteClient client = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("cba")
- .requestBatchCount(1)
+ .requestBatchCount(10)
.build();
try {
- final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
- Assert.assertNotNull(transaction);
-
- final DataPacket packet = transaction.receive();
- Assert.assertNotNull(packet);
-
- final InputStream in = packet.getData();
- final long size = packet.getSize();
- final byte[] buff = new byte[(int) size];
-
- StreamUtils.fillBuffer(in, buff);
-
- Assert.assertNull(transaction.receive());
-
- transaction.confirm();
- transaction.complete();
+ for (int i=0; i < 1000; i++) {
+ final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
+ Assert.assertNotNull(transaction);
+
+ DataPacket packet;
+ while (true) {
+ packet = transaction.receive();
+ if ( packet == null ) {
+ break;
+ }
+
+ final InputStream in = packet.getData();
+ final long size = packet.getSize();
+ final byte[] buff = new byte[(int) size];
+
+ StreamUtils.fillBuffer(in, buff);
+ }
+
+ transaction.confirm();
+ transaction.complete();
+ }
} finally {
client.close();
}
@@ -70,7 +75,7 @@ public class TestSiteToSiteClient {
@Test
- //@Ignore("For local testing only; not really a unit test but a manual test")
+ @Ignore("For local testing only; not really a unit test but a manual test")
public void testSend() throws IOException {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a7405b91/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index 8a4839b..493d1fe 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -206,7 +206,8 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
protocol.setRootProcessGroup(rootGroup.get());
protocol.setNodeInformant(nodeInformant);
- peer = new Peer(commsSession, peerUri, "nifi://localhost:" + getPort());
+ final PeerDescription description = new PeerDescription("localhost", getPort(), sslContext != null);
+ peer = new Peer(description, commsSession, peerUri, "nifi://localhost:" + getPort());
LOG.debug("Handshaking....");
protocol.handshake(peer);