You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/23 20:57:07 UTC

[14/29] incubator-nifi git commit: NIFI-282: Fixed bug that caused client not to be able to communicate with remote NiFi instance

NIFI-282: Fixed bug that caused client not to be able to communicate with remote NiFi instance


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/5c8a9e22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/5c8a9e22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/5c8a9e22

Branch: refs/heads/develop
Commit: 5c8a9e22d11007487b00d42455bc630451c76f82
Parents: d1e058c
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Feb 12 09:15:07 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Feb 12 09:15:07 2015 -0500

----------------------------------------------------------------------
 .../client/socket/EndpointConnectionPool.java   | 27 +++++++++----
 .../nifi/remote/client/socket/SocketClient.java | 12 ++++--
 .../client/socket/TestSiteToSiteClient.java     | 41 ++++++++++----------
 3 files changed, 47 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/5c8a9e22/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index 43bc8e5..c0e4761 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -38,6 +38,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -114,6 +115,7 @@ public class EndpointConnectionPool {
     private final SSLContext sslContext;
     private final ScheduledExecutorService taskExecutor;
     private final int idleExpirationMillis;
+    private final RemoteDestination remoteDestination;
     
     private final ReadWriteLock listeningPortRWLock = new ReentrantReadWriteLock();
     private final Lock remoteInfoReadLock = listeningPortRWLock.readLock();
@@ -128,15 +130,17 @@ public class EndpointConnectionPool {
     private volatile boolean shutdown = false;
     
     
-    public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final int idleExpirationMillis, 
-            final EventReporter eventReporter, final File persistenceFile) 
+    public EndpointConnectionPool(final String clusterUrl, final RemoteDestination remoteDestination, final int commsTimeoutMillis, 
+            final int idleExpirationMillis, final EventReporter eventReporter, final File persistenceFile) 
     {
-    	this(clusterUrl, commsTimeoutMillis, idleExpirationMillis, null, eventReporter, persistenceFile);
+    	this(clusterUrl, remoteDestination, commsTimeoutMillis, idleExpirationMillis, null, eventReporter, persistenceFile);
     }
     
-    public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final int idleExpirationMillis,
+    public EndpointConnectionPool(final String clusterUrl, final RemoteDestination remoteDestination, final int commsTimeoutMillis, final int idleExpirationMillis,
             final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) 
     {
+        Objects.requireNonNull(clusterUrl, "URL cannot be null");
+        Objects.requireNonNull(remoteDestination, "Remote Destination/Port Identifier cannot be null");
     	try {
     		this.clusterUrl = new URI(clusterUrl);
     	} catch (final URISyntaxException e) {
@@ -150,6 +154,7 @@ public class EndpointConnectionPool {
         }
         apiUri = this.clusterUrl.getScheme() + "://" + this.clusterUrl.getHost() + ":" + this.clusterUrl.getPort() + uriPath + "-api";
         
+        this.remoteDestination = remoteDestination;
     	this.sslContext = sslContext;
     	this.peersFile = persistenceFile;
     	this.eventReporter = eventReporter;
@@ -197,12 +202,12 @@ public class EndpointConnectionPool {
     }
     
     
-    public EndpointConnection getEndpointConnection(final RemoteDestination remoteDestination, final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
-        return getEndpointConnection(remoteDestination, direction, null);
+    public EndpointConnection getEndpointConnection(final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
+        return getEndpointConnection(direction, null);
     }
     
     
-    public EndpointConnection getEndpointConnection(final RemoteDestination remoteDestination, final TransferDirection direction, final SiteToSiteClientConfig config) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
+    public EndpointConnection getEndpointConnection(final TransferDirection direction, final SiteToSiteClientConfig config) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
     	//
         // Attempt to get a connection state that already exists for this URL.
         //
@@ -419,6 +424,7 @@ public class EndpointConnectionPool {
         return (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD);
     }
     
+    
     private PeerStatus getNextPeerStatus(final TransferDirection direction) {
         List<PeerStatus> peerList = peerStatuses;
         if ( isPeerRefreshNeeded(peerList) ) {
@@ -532,7 +538,12 @@ public class EndpointConnectionPool {
         RemoteResourceInitiator.initiateResourceNegotiation(clientProtocol, dis, dos);
 
         clientProtocol.setTimeout(commsTimeout);
-        clientProtocol.handshake(peer, null);
+        if (clientProtocol.getVersionNegotiator().getVersion() < 5) {
+            clientProtocol.handshake(peer, remoteDestination.getIdentifier());
+        } else {
+            clientProtocol.handshake(peer, null);
+        }
+        
         final Set<PeerStatus> peerStatuses = clientProtocol.getPeerStatuses(peer);
         persistPeerStatuses(peerStatuses);
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/5c8a9e22/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index aae19b3..016e67f 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -40,9 +40,11 @@ public class SocketClient implements SiteToSiteClient {
 	private final String portName;
 	private final long penalizationNanos;
 	private volatile String portIdentifier;
+	private volatile boolean closed = false;
 	
 	public SocketClient(final SiteToSiteClientConfig config) {
-		pool = new EndpointConnectionPool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS),
+		pool = new EndpointConnectionPool(config.getUrl(), createRemoteDestination(config.getPortIdentifier()), 
+		        (int) config.getTimeout(TimeUnit.MILLISECONDS),
 		        (int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS),
 				config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile());
 		
@@ -107,15 +109,16 @@ public class SocketClient implements SiteToSiteClient {
 	
 	@Override
 	public Transaction createTransaction(final TransferDirection direction) throws IOException {
+	    if ( closed ) {
+	        throw new IllegalStateException("Client is closed");
+	    }
 		final String portId = getPortIdentifier(direction);
 		
 		if ( portId == null ) {
 			throw new IOException("Could not find Port with name '" + portName + "' for remote NiFi instance");
 		}
 		
-		final RemoteDestination remoteDestination = createRemoteDestination(portId);
-		
-		final EndpointConnection connectionState = pool.getEndpointConnection(remoteDestination, direction, getConfig());
+		final EndpointConnection connectionState = pool.getEndpointConnection(direction, getConfig());
 		if ( connectionState == null ) {
 		    return null;
 		}
@@ -196,6 +199,7 @@ public class SocketClient implements SiteToSiteClient {
 	
 	@Override
 	public void close() throws IOException {
+	    closed = true;
 		pool.shutdown();
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/5c8a9e22/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
index 2fd90f8..75becd3 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
@@ -75,28 +75,27 @@ public class TestSiteToSiteClient {
     public void testSend() throws IOException {
         System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
         
-        final SiteToSiteClient client = new SiteToSiteClient.Builder()
-            .url("http://10.0.64.63:8080/nifi")
-            .portName("input")
-            .nodePenalizationPeriod(10, TimeUnit.MILLISECONDS)
-            .build();
+            final SiteToSiteClient client = new SiteToSiteClient.Builder()
+                .url("http://localhost:8080/nifi")
+                .portName("input")
+                .build();
         
-        try {
-            final Transaction transaction = client.createTransaction(TransferDirection.SEND);
-            Assert.assertNotNull(transaction);
-            
-            final Map<String, String> attrs = new HashMap<>();
-            attrs.put("site-to-site", "yes, please!");
-            final byte[] bytes = "Hello".getBytes();
-            final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-            final DataPacket packet = new StandardDataPacket(attrs, bais, bytes.length);
-            transaction.send(packet);
-
-            transaction.confirm();
-            transaction.complete();
-        } finally {
-            client.close();
-        }
+            try {
+                final Transaction transaction = client.createTransaction(TransferDirection.SEND);
+                Assert.assertNotNull(transaction);
+                
+                    final Map<String, String> attrs = new HashMap<>();
+                    attrs.put("site-to-site", "yes, please!");
+                    final byte[] bytes = "Hello".getBytes();
+                    final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+                    final DataPacket packet = new StandardDataPacket(attrs, bais, bytes.length);
+                    transaction.send(packet);
+                
+                transaction.confirm();
+                transaction.complete();
+            } finally {
+                client.close();
+            }
     }
     
 }