You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/19 14:31:40 UTC
incubator-nifi git commit: NIFI-282: Continue separating site-to-site
functionality into utility
Repository: incubator-nifi
Updated Branches:
refs/heads/site-to-site-client fdf758460 -> a6293e340
NIFI-282: Continue separating site-to-site functionality into utility
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a6293e34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a6293e34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a6293e34
Branch: refs/heads/site-to-site-client
Commit: a6293e34086cf449dc11be9105fe0794302b8c5a
Parents: fdf7584
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Jan 19 08:31:34 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Jan 19 08:31:34 2015 -0500
----------------------------------------------------------------------
.../socket/EndpointConnectionStatePool.java | 4 -
.../protocol/socket/SocketClientProtocol.java | 3 +-
.../socket/TestEndpointConnectionStatePool.java | 95 +++++++++++++++++++
.../apache/nifi/groups/RemoteProcessGroup.java | 3 +
.../nifi/remote/StandardRemoteProcessGroup.java | 14 ++-
.../nifi/remote/SocketRemoteSiteListener.java | 2 +-
.../nifi/remote/StandardRemoteGroupPort.java | 41 +--------
.../remote/TestStandardRemoteGroupPort.java | 97 --------------------
8 files changed, 110 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
index 2dd489d..d20fb58 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
@@ -518,10 +518,6 @@ public class EndpointConnectionStatePool {
}
-// private List<PeerStatus> formulateDestinationList(final ClusterNodeInformation clusterNodeInfo) throws IOException {
-// return formulateDestinationList(clusterNodeInfo, getConnectableType());
-// }
-
static List<PeerStatus> formulateDestinationList(final ClusterNodeInformation clusterNodeInfo, final TransferDirection direction) {
final Collection<NodeInformation> nodeInfoSet = clusterNodeInfo.getNodeInformation();
final int numDestinations = Math.max(128, nodeInfoSet.size());
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index 2f4f755..560385c 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -59,7 +59,6 @@ import org.slf4j.LoggerFactory;
public class SocketClientProtocol implements ClientProtocol {
private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(4, 3, 2, 1);
-
private RemoteDestination destination;
private boolean useCompression;
@@ -105,7 +104,7 @@ public class SocketClientProtocol implements ClientProtocol {
properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(timeoutMillis) );
final CommunicationsSession commsSession = peer.getCommunicationsSession();
- commsSession.setTimeout((int) destination.getCommunicationsTimeout(TimeUnit.MILLISECONDS));
+ commsSession.setTimeout(timeoutMillis);
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/commons/site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java b/nifi/commons/site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
new file mode 100644
index 0000000..d8899ea
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client.socket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.cluster.ClusterNodeInformation;
+import org.apache.nifi.remote.cluster.NodeInformation;
+import org.junit.Test;
+
+public class TestEndpointConnectionStatePool {
+
+ @Test
+ public void testFormulateDestinationListForOutput() throws IOException {
+ final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
+ final List<NodeInformation> collection = new ArrayList<>();
+ collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096));
+ collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 10240));
+ collection.add(new NodeInformation("ShouldGetLittle", 3, 3333, true, 1024));
+ collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096));
+ collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096));
+
+ clusterNodeInfo.setNodeInformation(collection);
+ final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
+ for ( final PeerStatus peerStatus : destinations ) {
+ System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+ }
+ }
+
+ @Test
+ public void testFormulateDestinationListForOutputHugeDifference() throws IOException {
+ final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
+ final List<NodeInformation> collection = new ArrayList<>();
+ collection.add(new NodeInformation("ShouldGetLittle", 1, 1111, true, 500));
+ collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 50000));
+
+ clusterNodeInfo.setNodeInformation(collection);
+ final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
+ for ( final PeerStatus peerStatus : destinations ) {
+ System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+ }
+ }
+
+
+
+
+ @Test
+ public void testFormulateDestinationListForInputPorts() throws IOException {
+ final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
+ final List<NodeInformation> collection = new ArrayList<>();
+ collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096));
+ collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 10240));
+ collection.add(new NodeInformation("ShouldGetLots", 3, 3333, true, 1024));
+ collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096));
+ collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096));
+
+ clusterNodeInfo.setNodeInformation(collection);
+ final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
+ for ( final PeerStatus peerStatus : destinations ) {
+ System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+ }
+ }
+
+ @Test
+ public void testFormulateDestinationListForInputPortsHugeDifference() throws IOException {
+ final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
+ final List<NodeInformation> collection = new ArrayList<>();
+ collection.add(new NodeInformation("ShouldGetLots", 1, 1111, true, 500));
+ collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 50000));
+
+ clusterNodeInfo.setNodeInformation(collection);
+ final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
+ for ( final PeerStatus peerStatus : destinations ) {
+ System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
index 2e35422..9f2dac8 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -27,6 +27,7 @@ import org.apache.nifi.controller.exception.CommunicationsException;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
public interface RemoteProcessGroup {
@@ -80,6 +81,8 @@ public interface RemoteProcessGroup {
void setYieldDuration(final String yieldDuration);
String getYieldDuration();
+
+ EndpointConnectionStatePool getConnectionPool();
/**
* Sets the timeout using the TimePeriod format (e.g., "30 secs", "1 min")
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index bfa3d25..857add9 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -175,6 +175,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
};
+ endpointConnectionPool = new EndpointConnectionStatePool(sslContext, eventReporter, getPeerPersistenceFile());
+
final Runnable socketCleanup = new Runnable() {
@Override
public void run() {
@@ -187,14 +189,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
readLock.unlock();
}
- for (final StandardRemoteGroupPort port : ports) {
- port.cleanupSockets();
- }
+ endpointConnectionPool.cleanupExpiredSockets();
}
};
- endpointConnectionPool = new EndpointConnectionStatePool(sslContext, eventReporter, getPeerPersistenceFile());
-
final Runnable refreshPeers = new Runnable() {
@Override
public void run() {
@@ -240,6 +238,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
@Override
public void shutdown() {
backgroundThreadExecutor.shutdown();
+ endpointConnectionPool.shutdown();
}
@Override
@@ -1292,6 +1291,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
public String getYieldDuration() {
return yieldDuration;
}
+
+ @Override
+ public EndpointConnectionStatePool getConnectionPool() {
+ return endpointConnectionPool;
+ }
@Override
public void verifyCanDelete() {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index cb2d76d..f053e65 100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -198,7 +198,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
protocol.setRootProcessGroup(rootGroup.get());
protocol.setNodeInformant(nodeInformant);
- peer = new Peer(commsSession, peerUri);
+ peer = new Peer(commsSession, peerUri, "nifi://localhost:" + getPort());
LOG.debug("Handshaking....");
protocol.handshake(peer);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 53f998e..77ac1a9 100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -16,34 +16,18 @@
*/
package org.apache.nifi.remote;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLContext;
-import javax.security.cert.CertificateExpiredException;
-import javax.security.cert.CertificateNotYetValidException;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
@@ -57,29 +41,20 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.remote.client.socket.EndpointConnectionState;
import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
-import org.apache.nifi.remote.cluster.ClusterNodeInformation;
-import org.apache.nifi.remote.cluster.NodeInformation;
import org.apache.nifi.remote.codec.FlowFileCodec;
-import org.apache.nifi.remote.exception.BadRequestException;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.exception.UnknownPortException;
-import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
import org.apache.nifi.remote.protocol.ClientProtocol;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.sun.jersey.api.client.ClientHandlerException;
-
public class StandardRemoteGroupPort extends RemoteGroupPort {
public static final String USER_AGENT = "NiFi-Site-to-Site";
public static final String CONTENT_TYPE = "application/octet-stream";
@@ -112,14 +87,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
this.transferDirection = direction;
setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
- final File stateDir = NiFiProperties.getInstance().getPersistentStateDirectory();
- final File persistenceFile = new File(stateDir, remoteGroup.getIdentifier() + ".peers");
-
- // TODO: This should really be constructed in the RemoteProcessGroup and made available to all ports in
- // that remote process group. This prevents too many connections from being made and also protects the persistenceFile
- // so that only a single thread will ever attempt to write to the file at once.
- FIXME();
- connectionStatePool = new EndpointConnectionStatePool(sslContext, remoteGroup.getEventReporter(), persistenceFile);
+ connectionStatePool = remoteGroup.getConnectionPool();
}
@Override
@@ -145,8 +113,6 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
} finally {
interruptLock.unlock();
}
-
- connectionStatePool.shutdown();
}
@Override
@@ -162,11 +128,6 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
}
- void cleanupSockets() {
- connectionStatePool.cleanupExpiredSockets();
- }
-
-
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
if ( !remoteGroup.isTransmitting() ) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
deleted file mode 100644
index 7474d38..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-import org.apache.nifi.remote.StandardRemoteGroupPort;
-import org.apache.nifi.remote.PeerStatus;
-import org.apache.nifi.remote.cluster.ClusterNodeInformation;
-import org.apache.nifi.remote.cluster.NodeInformation;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.nifi.connectable.ConnectableType;
-import org.junit.Test;
-
-public class TestStandardRemoteGroupPort {
-
- @Test
- public void testFormulateDestinationListForOutput() throws IOException {
- final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
- final List<NodeInformation> collection = new ArrayList<>();
- collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096));
- collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 10240));
- collection.add(new NodeInformation("ShouldGetLittle", 3, 3333, true, 1024));
- collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096));
- collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096));
-
- clusterNodeInfo.setNodeInformation(collection);
- final List<PeerStatus> destinations = StandardRemoteGroupPort.formulateDestinationList(clusterNodeInfo, ConnectableType.REMOTE_OUTPUT_PORT);
- for ( final PeerStatus peerStatus : destinations ) {
- System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
- }
- }
-
- @Test
- public void testFormulateDestinationListForOutputHugeDifference() throws IOException {
- final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
- final List<NodeInformation> collection = new ArrayList<>();
- collection.add(new NodeInformation("ShouldGetLittle", 1, 1111, true, 500));
- collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 50000));
-
- clusterNodeInfo.setNodeInformation(collection);
- final List<PeerStatus> destinations = StandardRemoteGroupPort.formulateDestinationList(clusterNodeInfo, ConnectableType.REMOTE_OUTPUT_PORT);
- for ( final PeerStatus peerStatus : destinations ) {
- System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
- }
- }
-
-
-
-
- @Test
- public void testFormulateDestinationListForInputPorts() throws IOException {
- final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
- final List<NodeInformation> collection = new ArrayList<>();
- collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096));
- collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 10240));
- collection.add(new NodeInformation("ShouldGetLots", 3, 3333, true, 1024));
- collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096));
- collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096));
-
- clusterNodeInfo.setNodeInformation(collection);
- final List<PeerStatus> destinations = StandardRemoteGroupPort.formulateDestinationList(clusterNodeInfo, ConnectableType.REMOTE_INPUT_PORT);
- for ( final PeerStatus peerStatus : destinations ) {
- System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
- }
- }
-
- @Test
- public void testFormulateDestinationListForInputPortsHugeDifference() throws IOException {
- final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
- final List<NodeInformation> collection = new ArrayList<>();
- collection.add(new NodeInformation("ShouldGetLots", 1, 1111, true, 500));
- collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 50000));
-
- clusterNodeInfo.setNodeInformation(collection);
- final List<PeerStatus> destinations = StandardRemoteGroupPort.formulateDestinationList(clusterNodeInfo, ConnectableType.REMOTE_INPUT_PORT);
- for ( final PeerStatus peerStatus : destinations ) {
- System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
- }
- }
-}