You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/19 14:31:40 UTC

incubator-nifi git commit: NIFI-282: Continue separating site-to-site functionality into utility

Repository: incubator-nifi
Updated Branches:
  refs/heads/site-to-site-client fdf758460 -> a6293e340


NIFI-282: Continue separating site-to-site functionality into utility


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a6293e34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a6293e34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a6293e34

Branch: refs/heads/site-to-site-client
Commit: a6293e34086cf449dc11be9105fe0794302b8c5a
Parents: fdf7584
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Jan 19 08:31:34 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Jan 19 08:31:34 2015 -0500

----------------------------------------------------------------------
 .../socket/EndpointConnectionStatePool.java     |  4 -
 .../protocol/socket/SocketClientProtocol.java   |  3 +-
 .../socket/TestEndpointConnectionStatePool.java | 95 +++++++++++++++++++
 .../apache/nifi/groups/RemoteProcessGroup.java  |  3 +
 .../nifi/remote/StandardRemoteProcessGroup.java | 14 ++-
 .../nifi/remote/SocketRemoteSiteListener.java   |  2 +-
 .../nifi/remote/StandardRemoteGroupPort.java    | 41 +--------
 .../remote/TestStandardRemoteGroupPort.java     | 97 --------------------
 8 files changed, 110 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
index 2dd489d..d20fb58 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
@@ -518,10 +518,6 @@ public class EndpointConnectionStatePool {
     }
     
     
-//    private List<PeerStatus> formulateDestinationList(final ClusterNodeInformation clusterNodeInfo) throws IOException {
-//        return formulateDestinationList(clusterNodeInfo, getConnectableType());
-//    }
-    
     static List<PeerStatus> formulateDestinationList(final ClusterNodeInformation clusterNodeInfo, final TransferDirection direction) {
         final Collection<NodeInformation> nodeInfoSet = clusterNodeInfo.getNodeInformation();
         final int numDestinations = Math.max(128, nodeInfoSet.size());

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index 2f4f755..560385c 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -59,7 +59,6 @@ import org.slf4j.LoggerFactory;
 public class SocketClientProtocol implements ClientProtocol {
     private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(4, 3, 2, 1);
 
-    
     private RemoteDestination destination;
     private boolean useCompression;
     
@@ -105,7 +104,7 @@ public class SocketClientProtocol implements ClientProtocol {
         properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(timeoutMillis) );
         
         final CommunicationsSession commsSession = peer.getCommunicationsSession();
-        commsSession.setTimeout((int) destination.getCommunicationsTimeout(TimeUnit.MILLISECONDS));
+        commsSession.setTimeout(timeoutMillis);
         final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
         

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/commons/site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java b/nifi/commons/site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
new file mode 100644
index 0000000..d8899ea
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client.socket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.cluster.ClusterNodeInformation;
+import org.apache.nifi.remote.cluster.NodeInformation;
+import org.junit.Test;
+
+public class TestEndpointConnectionStatePool {
+
+    @Test
+    public void testFormulateDestinationListForOutput() throws IOException {
+        final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
+        final List<NodeInformation> collection = new ArrayList<>();
+        collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096));
+        collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 10240));
+        collection.add(new NodeInformation("ShouldGetLittle", 3, 3333, true, 1024));
+        collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096));
+        collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096));
+
+        clusterNodeInfo.setNodeInformation(collection);
+        final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
+        for ( final PeerStatus peerStatus : destinations ) {
+            System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+        }
+    }
+    
+    @Test
+    public void testFormulateDestinationListForOutputHugeDifference() throws IOException {
+        final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
+        final List<NodeInformation> collection = new ArrayList<>();
+        collection.add(new NodeInformation("ShouldGetLittle", 1, 1111, true, 500));
+        collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 50000));
+
+        clusterNodeInfo.setNodeInformation(collection);
+        final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
+        for ( final PeerStatus peerStatus : destinations ) {
+            System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+        }
+    }
+    
+    
+    
+    
+    @Test
+    public void testFormulateDestinationListForInputPorts() throws IOException {
+        final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
+        final List<NodeInformation> collection = new ArrayList<>();
+        collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096));
+        collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 10240));
+        collection.add(new NodeInformation("ShouldGetLots", 3, 3333, true, 1024));
+        collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096));
+        collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096));
+
+        clusterNodeInfo.setNodeInformation(collection);
+        final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
+        for ( final PeerStatus peerStatus : destinations ) {
+            System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+        }
+    }
+    
+    @Test
+    public void testFormulateDestinationListForInputPortsHugeDifference() throws IOException {
+        final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
+        final List<NodeInformation> collection = new ArrayList<>();
+        collection.add(new NodeInformation("ShouldGetLots", 1, 1111, true, 500));
+        collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 50000));
+
+        clusterNodeInfo.setNodeInformation(collection);
+        final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
+        for ( final PeerStatus peerStatus : destinations ) {
+            System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
index 2e35422..9f2dac8 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -27,6 +27,7 @@ import org.apache.nifi.controller.exception.CommunicationsException;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.remote.PeerStatus;
 import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
 
 public interface RemoteProcessGroup {
 
@@ -80,6 +81,8 @@ public interface RemoteProcessGroup {
     void setYieldDuration(final String yieldDuration);
 
     String getYieldDuration();
+    
+    EndpointConnectionStatePool getConnectionPool();
 
     /**
      * Sets the timeout using the TimePeriod format (e.g., "30 secs", "1 min")

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index bfa3d25..857add9 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -175,6 +175,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
             }
         };
 
+        endpointConnectionPool = new EndpointConnectionStatePool(sslContext, eventReporter, getPeerPersistenceFile());
+        
         final Runnable socketCleanup = new Runnable() {
             @Override
             public void run() {
@@ -187,14 +189,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
                     readLock.unlock();
                 }
 
-                for (final StandardRemoteGroupPort port : ports) {
-                    port.cleanupSockets();
-                }
+                endpointConnectionPool.cleanupExpiredSockets();
             }
         };
 
-        endpointConnectionPool = new EndpointConnectionStatePool(sslContext, eventReporter, getPeerPersistenceFile());
-
         final Runnable refreshPeers = new Runnable() {
             @Override
             public void run() {
@@ -240,6 +238,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
     @Override
     public void shutdown() {
         backgroundThreadExecutor.shutdown();
+        endpointConnectionPool.shutdown();
     }
     
     @Override
@@ -1292,6 +1291,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
     public String getYieldDuration() {
         return yieldDuration;
     }
+    
+    @Override
+    public EndpointConnectionStatePool getConnectionPool() {
+        return endpointConnectionPool;
+    }
 
     @Override
     public void verifyCanDelete() {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index cb2d76d..f053e65 100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -198,7 +198,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
                             	protocol.setRootProcessGroup(rootGroup.get());
                           	    protocol.setNodeInformant(nodeInformant);
                             	
-                            	peer = new Peer(commsSession, peerUri);
+                            	peer = new Peer(commsSession, peerUri, "nifi://localhost:" + getPort());
                             	LOG.debug("Handshaking....");
                             	protocol.handshake(peer);
                             	

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 53f998e..77ac1a9 100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -16,34 +16,18 @@
  */
 package org.apache.nifi.remote;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import javax.net.ssl.SSLContext;
-import javax.security.cert.CertificateExpiredException;
-import javax.security.cert.CertificateNotYetValidException;
 
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.connectable.ConnectableType;
@@ -57,29 +41,20 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.remote.client.socket.EndpointConnectionState;
 import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
-import org.apache.nifi.remote.cluster.ClusterNodeInformation;
-import org.apache.nifi.remote.cluster.NodeInformation;
 import org.apache.nifi.remote.codec.FlowFileCodec;
-import org.apache.nifi.remote.exception.BadRequestException;
 import org.apache.nifi.remote.exception.HandshakeException;
 import org.apache.nifi.remote.exception.PortNotRunningException;
 import org.apache.nifi.remote.exception.ProtocolException;
 import org.apache.nifi.remote.exception.TransmissionDisabledException;
 import org.apache.nifi.remote.exception.UnknownPortException;
-import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
 import org.apache.nifi.remote.protocol.ClientProtocol;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
 import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.sun.jersey.api.client.ClientHandlerException;
-
 public class StandardRemoteGroupPort extends RemoteGroupPort {
     public static final String USER_AGENT = "NiFi-Site-to-Site";
     public static final String CONTENT_TYPE = "application/octet-stream";
@@ -112,14 +87,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
         this.transferDirection = direction;
         setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
         
-        final File stateDir = NiFiProperties.getInstance().getPersistentStateDirectory();
-        final File persistenceFile = new File(stateDir, remoteGroup.getIdentifier() + ".peers");
-        
-        // TODO: This should really be constructed in the RemoteProcessGroup and made available to all ports in
-        // that remote process group. This prevents too many connections from being made and also protects the persistenceFile
-        // so that only a single thread will ever attempt to write to the file at once.
-        FIXME();
-        connectionStatePool = new EndpointConnectionStatePool(sslContext, remoteGroup.getEventReporter(), persistenceFile);
+        connectionStatePool = remoteGroup.getConnectionPool();
     }
     
     @Override
@@ -145,8 +113,6 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
         } finally {
             interruptLock.unlock();
         }
-
-    	connectionStatePool.shutdown();
     }
     
     @Override
@@ -162,11 +128,6 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
     }
     
     
-    void cleanupSockets() {
-        connectionStatePool.cleanupExpiredSockets();
-    }
-    
-    
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) {
         if ( !remoteGroup.isTransmitting() ) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
deleted file mode 100644
index 7474d38..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-import org.apache.nifi.remote.StandardRemoteGroupPort;
-import org.apache.nifi.remote.PeerStatus;
-import org.apache.nifi.remote.cluster.ClusterNodeInformation;
-import org.apache.nifi.remote.cluster.NodeInformation;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.nifi.connectable.ConnectableType;
-import org.junit.Test;
-
-public class TestStandardRemoteGroupPort {
-
-    @Test
-    public void testFormulateDestinationListForOutput() throws IOException {
-        final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
-        final List<NodeInformation> collection = new ArrayList<>();
-        collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096));
-        collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 10240));
-        collection.add(new NodeInformation("ShouldGetLittle", 3, 3333, true, 1024));
-        collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096));
-        collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096));
-
-        clusterNodeInfo.setNodeInformation(collection);
-        final List<PeerStatus> destinations = StandardRemoteGroupPort.formulateDestinationList(clusterNodeInfo, ConnectableType.REMOTE_OUTPUT_PORT);
-        for ( final PeerStatus peerStatus : destinations ) {
-            System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
-        }
-    }
-    
-    @Test
-    public void testFormulateDestinationListForOutputHugeDifference() throws IOException {
-        final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
-        final List<NodeInformation> collection = new ArrayList<>();
-        collection.add(new NodeInformation("ShouldGetLittle", 1, 1111, true, 500));
-        collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 50000));
-
-        clusterNodeInfo.setNodeInformation(collection);
-        final List<PeerStatus> destinations = StandardRemoteGroupPort.formulateDestinationList(clusterNodeInfo, ConnectableType.REMOTE_OUTPUT_PORT);
-        for ( final PeerStatus peerStatus : destinations ) {
-            System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
-        }
-    }
-    
-    
-    
-    
-    @Test
-    public void testFormulateDestinationListForInputPorts() throws IOException {
-        final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
-        final List<NodeInformation> collection = new ArrayList<>();
-        collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096));
-        collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 10240));
-        collection.add(new NodeInformation("ShouldGetLots", 3, 3333, true, 1024));
-        collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096));
-        collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096));
-
-        clusterNodeInfo.setNodeInformation(collection);
-        final List<PeerStatus> destinations = StandardRemoteGroupPort.formulateDestinationList(clusterNodeInfo, ConnectableType.REMOTE_INPUT_PORT);
-        for ( final PeerStatus peerStatus : destinations ) {
-            System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
-        }
-    }
-    
-    @Test
-    public void testFormulateDestinationListForInputPortsHugeDifference() throws IOException {
-        final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
-        final List<NodeInformation> collection = new ArrayList<>();
-        collection.add(new NodeInformation("ShouldGetLots", 1, 1111, true, 500));
-        collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 50000));
-
-        clusterNodeInfo.setNodeInformation(collection);
-        final List<PeerStatus> destinations = StandardRemoteGroupPort.formulateDestinationList(clusterNodeInfo, ConnectableType.REMOTE_INPUT_PORT);
-        for ( final PeerStatus peerStatus : destinations ) {
-            System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
-        }
-    }
-}