You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/23 20:57:07 UTC
[14/29] incubator-nifi git commit: NIFI-282: Fixed bug that caused
client not to be able to communicate with remote NiFi instance
NIFI-282: Fixed bug that caused client not to be able to communicate with remote NiFi instance
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/5c8a9e22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/5c8a9e22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/5c8a9e22
Branch: refs/heads/develop
Commit: 5c8a9e22d11007487b00d42455bc630451c76f82
Parents: d1e058c
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Feb 12 09:15:07 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Feb 12 09:15:07 2015 -0500
----------------------------------------------------------------------
.../client/socket/EndpointConnectionPool.java | 27 +++++++++----
.../nifi/remote/client/socket/SocketClient.java | 12 ++++--
.../client/socket/TestSiteToSiteClient.java | 41 ++++++++++----------
3 files changed, 47 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/5c8a9e22/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index 43bc8e5..c0e4761 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -38,6 +38,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -114,6 +115,7 @@ public class EndpointConnectionPool {
private final SSLContext sslContext;
private final ScheduledExecutorService taskExecutor;
private final int idleExpirationMillis;
+ private final RemoteDestination remoteDestination;
private final ReadWriteLock listeningPortRWLock = new ReentrantReadWriteLock();
private final Lock remoteInfoReadLock = listeningPortRWLock.readLock();
@@ -128,15 +130,17 @@ public class EndpointConnectionPool {
private volatile boolean shutdown = false;
- public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final int idleExpirationMillis,
- final EventReporter eventReporter, final File persistenceFile)
+ public EndpointConnectionPool(final String clusterUrl, final RemoteDestination remoteDestination, final int commsTimeoutMillis,
+ final int idleExpirationMillis, final EventReporter eventReporter, final File persistenceFile)
{
- this(clusterUrl, commsTimeoutMillis, idleExpirationMillis, null, eventReporter, persistenceFile);
+ this(clusterUrl, remoteDestination, commsTimeoutMillis, idleExpirationMillis, null, eventReporter, persistenceFile);
}
- public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final int idleExpirationMillis,
+ public EndpointConnectionPool(final String clusterUrl, final RemoteDestination remoteDestination, final int commsTimeoutMillis, final int idleExpirationMillis,
final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile)
{
+ Objects.requireNonNull(clusterUrl, "URL cannot be null");
+ Objects.requireNonNull(remoteDestination, "Remote Destination/Port Identifier cannot be null");
try {
this.clusterUrl = new URI(clusterUrl);
} catch (final URISyntaxException e) {
@@ -150,6 +154,7 @@ public class EndpointConnectionPool {
}
apiUri = this.clusterUrl.getScheme() + "://" + this.clusterUrl.getHost() + ":" + this.clusterUrl.getPort() + uriPath + "-api";
+ this.remoteDestination = remoteDestination;
this.sslContext = sslContext;
this.peersFile = persistenceFile;
this.eventReporter = eventReporter;
@@ -197,12 +202,12 @@ public class EndpointConnectionPool {
}
- public EndpointConnection getEndpointConnection(final RemoteDestination remoteDestination, final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
- return getEndpointConnection(remoteDestination, direction, null);
+ public EndpointConnection getEndpointConnection(final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
+ return getEndpointConnection(direction, null);
}
- public EndpointConnection getEndpointConnection(final RemoteDestination remoteDestination, final TransferDirection direction, final SiteToSiteClientConfig config) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
+ public EndpointConnection getEndpointConnection(final TransferDirection direction, final SiteToSiteClientConfig config) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
//
// Attempt to get a connection state that already exists for this URL.
//
@@ -419,6 +424,7 @@ public class EndpointConnectionPool {
return (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD);
}
+
private PeerStatus getNextPeerStatus(final TransferDirection direction) {
List<PeerStatus> peerList = peerStatuses;
if ( isPeerRefreshNeeded(peerList) ) {
@@ -532,7 +538,12 @@ public class EndpointConnectionPool {
RemoteResourceInitiator.initiateResourceNegotiation(clientProtocol, dis, dos);
clientProtocol.setTimeout(commsTimeout);
- clientProtocol.handshake(peer, null);
+ if (clientProtocol.getVersionNegotiator().getVersion() < 5) {
+ clientProtocol.handshake(peer, remoteDestination.getIdentifier());
+ } else {
+ clientProtocol.handshake(peer, null);
+ }
+
final Set<PeerStatus> peerStatuses = clientProtocol.getPeerStatuses(peer);
persistPeerStatuses(peerStatuses);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/5c8a9e22/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index aae19b3..016e67f 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -40,9 +40,11 @@ public class SocketClient implements SiteToSiteClient {
private final String portName;
private final long penalizationNanos;
private volatile String portIdentifier;
+ private volatile boolean closed = false;
public SocketClient(final SiteToSiteClientConfig config) {
- pool = new EndpointConnectionPool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS),
+ pool = new EndpointConnectionPool(config.getUrl(), createRemoteDestination(config.getPortIdentifier()),
+ (int) config.getTimeout(TimeUnit.MILLISECONDS),
(int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS),
config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile());
@@ -107,15 +109,16 @@ public class SocketClient implements SiteToSiteClient {
@Override
public Transaction createTransaction(final TransferDirection direction) throws IOException {
+ if ( closed ) {
+ throw new IllegalStateException("Client is closed");
+ }
final String portId = getPortIdentifier(direction);
if ( portId == null ) {
throw new IOException("Could not find Port with name '" + portName + "' for remote NiFi instance");
}
- final RemoteDestination remoteDestination = createRemoteDestination(portId);
-
- final EndpointConnection connectionState = pool.getEndpointConnection(remoteDestination, direction, getConfig());
+ final EndpointConnection connectionState = pool.getEndpointConnection(direction, getConfig());
if ( connectionState == null ) {
return null;
}
@@ -196,6 +199,7 @@ public class SocketClient implements SiteToSiteClient {
@Override
public void close() throws IOException {
+ closed = true;
pool.shutdown();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/5c8a9e22/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
index 2fd90f8..75becd3 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
@@ -75,28 +75,27 @@ public class TestSiteToSiteClient {
public void testSend() throws IOException {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
- final SiteToSiteClient client = new SiteToSiteClient.Builder()
- .url("http://10.0.64.63:8080/nifi")
- .portName("input")
- .nodePenalizationPeriod(10, TimeUnit.MILLISECONDS)
- .build();
+ final SiteToSiteClient client = new SiteToSiteClient.Builder()
+ .url("http://localhost:8080/nifi")
+ .portName("input")
+ .build();
- try {
- final Transaction transaction = client.createTransaction(TransferDirection.SEND);
- Assert.assertNotNull(transaction);
-
- final Map<String, String> attrs = new HashMap<>();
- attrs.put("site-to-site", "yes, please!");
- final byte[] bytes = "Hello".getBytes();
- final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
- final DataPacket packet = new StandardDataPacket(attrs, bais, bytes.length);
- transaction.send(packet);
-
- transaction.confirm();
- transaction.complete();
- } finally {
- client.close();
- }
+ try {
+ final Transaction transaction = client.createTransaction(TransferDirection.SEND);
+ Assert.assertNotNull(transaction);
+
+ final Map<String, String> attrs = new HashMap<>();
+ attrs.put("site-to-site", "yes, please!");
+ final byte[] bytes = "Hello".getBytes();
+ final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ final DataPacket packet = new StandardDataPacket(attrs, bais, bytes.length);
+ transaction.send(packet);
+
+ transaction.confirm();
+ transaction.complete();
+ } finally {
+ client.close();
+ }
}
}