You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2021/03/08 12:16:44 UTC
[phoenix-omid] branch master updated: OMID-198 Replace static ports
used for TSO server with random ports in the tests(Rajeshbabu)
This is an automated email from the ASF dual-hosted git repository.
rajeshbabu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-omid.git
The following commit(s) were added to refs/heads/master by this push:
new b9c8caf OMID-198 Replace static ports used for TSO server with random ports in the tests(Rajeshbabu)
b9c8caf is described below
commit b9c8cafe611fc9e31f02df11503310a35dd91ae1
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
AuthorDate: Mon Mar 8 17:45:59 2021 +0530
OMID-198 Replace static ports used for TSO server with random ports in the tests(Rajeshbabu)
---
.../main/java/org/apache/omid/NetworkUtils.java | 23 +-
.../org/apache/omid/transaction/OmidTestBase.java | 17 +-
.../org/apache/omid/transaction/TestFilters.java | 5 +-
.../apache/omid/transaction/TestOmidLLRaces.java | 11 +-
.../apache/omid/transaction/TestShadowCells.java | 9 +-
.../omid/tso/TestTSOChannelHandlerNetty.java | 329 +++++++++++----------
.../test/java/org/apache/omid/tso/TestTSOLL.java | 14 +-
7 files changed, 222 insertions(+), 186 deletions(-)
diff --git a/common/src/main/java/org/apache/omid/NetworkUtils.java b/common/src/main/java/org/apache/omid/NetworkUtils.java
index d6e1920..c78f7ea 100644
--- a/common/src/main/java/org/apache/omid/NetworkUtils.java
+++ b/common/src/main/java/org/apache/omid/NetworkUtils.java
@@ -17,10 +17,8 @@
*/
package org.apache.omid;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
+import java.io.IOException;
+import java.net.*;
import java.util.Collections;
import java.util.Enumeration;
@@ -70,4 +68,21 @@ public class NetworkUtils {
throw new IllegalArgumentException(String.format("No network '%s*'/'%s*' interfaces found",
MAC_TSO_NET_IFACE_PREFIX, LINUX_TSO_NET_IFACE_PREFIX));
}
+
+ /**
+ * Picks a free port on the host by binding a Socket to '0'.
+ */
+ public static int getFreePort() throws IOException {
+ ServerSocket s = new ServerSocket(0);
+ try {
+ s.setReuseAddress(true);
+ int port = s.getLocalPort();
+ return port;
+ } finally {
+ if (null != s) {
+ s.close();
+ }
+ }
+ }
+
}
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java b/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
index 7256af9..6edbcb5 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.omid.NetworkUtils;
import org.apache.omid.TestUtils;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.InMemoryCommitTable;
@@ -75,6 +76,7 @@ public abstract class OmidTestBase {
protected static final String TEST_TABLE = "test";
protected static final String TEST_FAMILY = "data";
static final String TEST_FAMILY2 = "data2";
+ public static int port;
private HBaseCommitTableConfig hBaseCommitTableConfig;
@@ -88,7 +90,8 @@ public abstract class OmidTestBase {
public void beforeGroups(ITestContext context) throws Exception {
// TSO Setup
TSOServerConfig tsoConfig = new TSOServerConfig();
- tsoConfig.setPort(1234);
+ port = NetworkUtils.getFreePort();
+ tsoConfig.setPort(port);
tsoConfig.setConflictMapSize(1000);
tsoConfig.setWaitStrategy("LOW_CPU");
tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
@@ -99,12 +102,12 @@ public abstract class OmidTestBase {
HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
tso.startAsync();
tso.awaitRunning();
- TestUtils.waitForSocketListening("localhost", 1234, 100);
+ TestUtils.waitForSocketListening("localhost", port, 100);
LOG.info("Finished loading TSO");
context.setAttribute("tso", tso);
OmidClientConfiguration clientConf = new OmidClientConfiguration();
- clientConf.setConnectionString("localhost:1234");
+ clientConf.setConnectionString("localhost:" + port);
context.setAttribute("clientConf", clientConf);
InMemoryCommitTable commitTable = (InMemoryCommitTable) injector.getInstance(CommitTable.class);
@@ -177,7 +180,7 @@ public abstract class OmidTestBase {
protected TransactionManager newTransactionManager(ITestContext context, PostCommitActions postCommitActions) throws Exception {
HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
- clientConf.setConnectionString("localhost:1234");
+ clientConf.setConnectionString("localhost:" + port);
clientConf.setHBaseConfiguration(hbaseConf);
return HBaseTransactionManager.builder(clientConf)
.postCommitter(postCommitActions)
@@ -188,7 +191,7 @@ public abstract class OmidTestBase {
protected TransactionManager newTransactionManager(ITestContext context, TSOClient tsoClient) throws Exception {
HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
- clientConf.setConnectionString("localhost:1234");
+ clientConf.setConnectionString("localhost:" + port);
clientConf.setHBaseConfiguration(hbaseConf);
return HBaseTransactionManager.builder(clientConf)
.commitTableClient(getCommitTable(context).getClient())
@@ -199,7 +202,7 @@ public abstract class OmidTestBase {
protected TransactionManager newTransactionManager(ITestContext context, CommitTable.Client commitTableClient)
throws Exception {
HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
- clientConf.setConnectionString("localhost:1234");
+ clientConf.setConnectionString("localhost:" + port);
clientConf.setHBaseConfiguration(hbaseConf);
return HBaseTransactionManager.builder(clientConf)
.commitTableClient(commitTableClient)
@@ -217,7 +220,7 @@ public abstract class OmidTestBase {
getClient(context).close().get();
getTSO(context).stopAsync();
getTSO(context).awaitTerminated();
- TestUtils.waitForSocketNotListening("localhost", 1234, 1000);
+ TestUtils.waitForSocketNotListening("localhost", port, 1000);
}
@AfterMethod(groups = "sharedHBase", timeOut = 60_000)
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java
index efe33b3..c908baf 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java
@@ -71,9 +71,8 @@ public class TestFilters extends OmidTestBase {
private void testGet(ITestContext context, Filter f) throws Exception {
CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
-
HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
- hbaseOmidClientConf.setConnectionString("localhost:1234");
+ hbaseOmidClientConf.setConnectionString("localhost:" + port);
hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
TTable table = new TTable(connection, TEST_TABLE);
@@ -123,7 +122,7 @@ public class TestFilters extends OmidTestBase {
CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
- hbaseOmidClientConf.getOmidClientConfiguration().setConnectionString("localhost:1234");
+ hbaseOmidClientConf.getOmidClientConfiguration().setConnectionString("localhost:" + port);
hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
TTable table = new TTable(connection, TEST_TABLE);
PostCommitActions syncPostCommitter = spy(
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestOmidLLRaces.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestOmidLLRaces.java
index e680f99..3de0c72 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestOmidLLRaces.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestOmidLLRaces.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.omid.NetworkUtils;
import org.apache.omid.committable.hbase.KeyGenerator;
import org.apache.omid.committable.hbase.KeyGeneratorImplementations;
@@ -90,12 +91,14 @@ public class TestOmidLLRaces {
private static final Logger LOG = LoggerFactory.getLogger(TestOmidLLRaces.class);
private TSOClient client;
+ private int port;
@BeforeClass
public void setup() throws Exception {
// TSO Setup
TSOServerConfig tsoConfig = new TSOServerConfig();
- tsoConfig.setPort(1234);
+ port = NetworkUtils.getFreePort();
+ tsoConfig.setPort(port);
tsoConfig.setConflictMapSize(1000);
tsoConfig.setLowLatency(true);
tsoConfig.setWaitStrategy("LOW_CPU");
@@ -105,11 +108,11 @@ public class TestOmidLLRaces {
HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
tso.startAsync();
tso.awaitRunning();
- TestUtils.waitForSocketListening("localhost", 1234, 100);
+ TestUtils.waitForSocketListening("localhost", port, 100);
LOG.info("Finished loading TSO");
OmidClientConfiguration clientConf = new OmidClientConfiguration();
- clientConf.setConnectionString("localhost:1234");
+ clientConf.setConnectionString("localhost:" + port);
// Create the associated Handler
client = TSOClient.newInstance(clientConf);
@@ -160,7 +163,7 @@ public class TestOmidLLRaces {
protected TransactionManager newTransactionManagerHBaseCommitTable(TSOClient tsoClient) throws Exception {
HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
- clientConf.setConnectionString("localhost:1234");
+ clientConf.setConnectionString("localhost:" + port);
clientConf.setHBaseConfiguration(hbaseConf);
return HBaseTransactionManager.builder(clientConf)
.tsoClient(tsoClient).build();
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
index 14a2e02..12e2d50 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
@@ -71,7 +71,6 @@ public class TestShadowCells extends OmidTestBase {
private static final Logger LOG = LoggerFactory.getLogger(TestShadowCells.class);
private static final String TSO_SERVER_HOST = "localhost";
- private static final int TSO_SERVER_PORT = 1234;
private static final String TEST_TABLE = "test";
private static final String TEST_FAMILY = "data";
@@ -117,7 +116,7 @@ public class TestShadowCells extends OmidTestBase {
CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
- hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+ hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + port);
hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
TransactionManager tm2 = HBaseTransactionManager.builder(hbaseOmidClientConf)
.commitTableClient(commitTableClient)
@@ -138,7 +137,7 @@ public class TestShadowCells extends OmidTestBase {
CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
- hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+ hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + port);
hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
PostCommitActions syncPostCommitter = spy(
new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
@@ -186,7 +185,7 @@ public class TestShadowCells extends OmidTestBase {
CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
- hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+ hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + port);
hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
PostCommitActions syncPostCommitter = spy(
new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
@@ -247,7 +246,7 @@ public class TestShadowCells extends OmidTestBase {
CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
- hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+ hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + port);
hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
PostCommitActions syncPostCommitter = spy(
new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
index 9a0a15a..88988e1 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
@@ -17,6 +17,7 @@
*/
package org.apache.omid.tso;
+import org.apache.omid.NetworkUtils;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.omid.metrics.NullMetricsProvider;
import org.apache.omid.proto.TSOProto;
@@ -45,6 +46,7 @@ import org.testng.annotations.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -60,6 +62,8 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
@SuppressWarnings({"UnusedDeclaration", "StatementWithEmptyBody"})
public class TestTSOChannelHandlerNetty {
@@ -70,186 +74,198 @@ public class TestTSOChannelHandlerNetty {
private
RequestProcessor requestProcessor;
- // Component under test
- private TSOChannelHandler channelHandler;
-
@BeforeMethod
public void beforeTestMethod() {
MockitoAnnotations.initMocks(this);
- TSOServerConfig config = new TSOServerConfig();
- config.setPort(1434);
- channelHandler = new TSOChannelHandler(config, requestProcessor, new NullMetricsProvider());
}
- @AfterMethod
- public void afterTestMethod() throws IOException {
- channelHandler.close();
+ private TSOChannelHandler getTSOChannelHandler(int port) {
+ TSOServerConfig config = new TSOServerConfig();
+ config.setPort(port);
+ return new TSOChannelHandler(config, requestProcessor, new NullMetricsProvider());
}
@Test(timeOut = 10_000)
public void testMainAPI() throws Exception {
-
- // Check initial state
- assertNull(channelHandler.listeningChannel);
- assertNull(channelHandler.channelGroup);
-
- // Check initial connection
- channelHandler.reconnect();
- assertTrue(channelHandler.listeningChannel.isOpen());
- assertEquals(channelHandler.channelGroup.size(), 1);
- assertEquals(((InetSocketAddress) channelHandler.listeningChannel.getLocalAddress()).getPort(), 1434);
-
- // Check connection close
- channelHandler.closeConnection();
- assertFalse(channelHandler.listeningChannel.isOpen());
- assertEquals(channelHandler.channelGroup.size(), 0);
-
- // Check re-closing connection
- channelHandler.closeConnection();
- assertFalse(channelHandler.listeningChannel.isOpen());
- assertEquals(channelHandler.channelGroup.size(), 0);
-
- // Check connection after closing
- channelHandler.reconnect();
- assertTrue(channelHandler.listeningChannel.isOpen());
- assertEquals(channelHandler.channelGroup.size(), 1);
-
- // Check re-connection
- channelHandler.reconnect();
- assertTrue(channelHandler.listeningChannel.isOpen());
- assertEquals(channelHandler.channelGroup.size(), 1);
-
- // Exercise closeable with re-connection trial
- channelHandler.close();
- assertFalse(channelHandler.listeningChannel.isOpen());
- assertEquals(channelHandler.channelGroup.size(), 0);
+ int port = NetworkUtils.getFreePort();
+ TSOChannelHandler channelHandler = getTSOChannelHandler(port);
try {
+ // Check initial state
+ assertNull(channelHandler.listeningChannel);
+ assertNull(channelHandler.channelGroup);
+
+ // Check initial connection
channelHandler.reconnect();
- } catch (ChannelException e) {
- // Expected: Can't reconnect after closing
+ assertTrue(channelHandler.listeningChannel.isOpen());
+ assertEquals(channelHandler.channelGroup.size(), 1);
+ assertEquals(((InetSocketAddress) channelHandler.listeningChannel.getLocalAddress()).getPort(), port);
+
+ // Check connection close
+ channelHandler.closeConnection();
+ assertFalse(channelHandler.listeningChannel.isOpen());
+ assertEquals(channelHandler.channelGroup.size(), 0);
+
+ // Check re-closing connection
+ channelHandler.closeConnection();
assertFalse(channelHandler.listeningChannel.isOpen());
assertEquals(channelHandler.channelGroup.size(), 0);
- }
+ // Check connection after closing
+ channelHandler.reconnect();
+ assertTrue(channelHandler.listeningChannel.isOpen());
+ assertEquals(channelHandler.channelGroup.size(), 1);
+
+ // Check re-connection
+ channelHandler.reconnect();
+ assertTrue(channelHandler.listeningChannel.isOpen());
+ assertEquals(channelHandler.channelGroup.size(), 1);
+
+ // Exercise closeable with re-connection trial
+ channelHandler.close();
+ assertFalse(channelHandler.listeningChannel.isOpen());
+ assertEquals(channelHandler.channelGroup.size(), 0);
+ try {
+ channelHandler.reconnect();
+ fail("Can't reconnect after closing");
+ } catch (ChannelException e) {
+ // Expected: Can't reconnect after closing
+ assertFalse(channelHandler.listeningChannel.isOpen());
+ assertEquals(channelHandler.channelGroup.size(), 0);
+ }
+ } finally {
+ if(channelHandler != null) channelHandler.close();
+ }
}
@Test(timeOut = 10_000)
public void testNettyConnectionToTSOFromClient() throws Exception {
+ int port = NetworkUtils.getFreePort();
+ TSOChannelHandler channelHandler = getTSOChannelHandler(port);
+ try {
+ ClientBootstrap nettyClient = createNettyClientBootstrap();
+
+ ChannelFuture channelF = nettyClient.connect(new InetSocketAddress("localhost", port));
+
+ // ------------------------------------------------------------------------------------------------------------
+ // Test the client can't connect cause the server is not there
+ // ------------------------------------------------------------------------------------------------------------
+ while (!channelF.isDone()) /** do nothing */ ;
+ assertFalse(channelF.isSuccess());
+
+ // ------------------------------------------------------------------------------------------------------------
+ // Test creation of a server connection
+ // ------------------------------------------------------------------------------------------------------------
+ channelHandler.reconnect();
+ assertTrue(channelHandler.listeningChannel.isOpen());
+ // Eventually the channel group of the server should contain the listening channel
+ assertEquals(channelHandler.channelGroup.size(), 1);
+
+ // ------------------------------------------------------------------------------------------------------------
+ // Test that a client can connect now
+ // ------------------------------------------------------------------------------------------------------------
+ channelF = nettyClient.connect(new InetSocketAddress("localhost", port));
+ while (!channelF.isDone()) /** do nothing */ ;
+ assertTrue(channelF.isSuccess());
+ assertTrue(channelF.getChannel().isConnected());
+ // Eventually the channel group of the server should have 2 elements
+ while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
+
+ // ------------------------------------------------------------------------------------------------------------
+ // Close the channel on the client side and test we have one element less in the channel group
+ // ------------------------------------------------------------------------------------------------------------
+ channelF.getChannel().close().await();
+ // Eventually the channel group of the server should have only one element
+ while (channelHandler.channelGroup.size() != 1) /** do nothing */ ;
+
+ // ------------------------------------------------------------------------------------------------------------
+ // Open a new channel and test the connection closing on the server side through the channel handler
+ // ------------------------------------------------------------------------------------------------------------
+ channelF = nettyClient.connect(new InetSocketAddress("localhost", port));
+ while (!channelF.isDone()) /** do nothing */ ;
+ assertTrue(channelF.isSuccess());
+ // Eventually the channel group of the server should have 2 elements again
+ while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
+ channelHandler.closeConnection();
+ assertFalse(channelHandler.listeningChannel.isOpen());
+ assertEquals(channelHandler.channelGroup.size(), 0);
+ // Wait some time and check the channel was closed
+ TimeUnit.SECONDS.sleep(1);
+ assertFalse(channelF.getChannel().isOpen());
+
+ // ------------------------------------------------------------------------------------------------------------
+ // Test server re-connections with connected clients
+ // ------------------------------------------------------------------------------------------------------------
+ // Connect first time
+ channelHandler.reconnect();
+ assertTrue(channelHandler.listeningChannel.isOpen());
+ // Eventually the channel group of the server should contain the listening channel
+ assertEquals(channelHandler.channelGroup.size(), 1);
+ // Check the client can connect
+ channelF = nettyClient.connect(new InetSocketAddress("localhost", port));
+ while (!channelF.isDone()) /** do nothing */ ;
+ assertTrue(channelF.isSuccess());
+ // Eventually the channel group of the server should have 2 elements
+ while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
+ // Re-connect and check that client connection was gone
+ channelHandler.reconnect();
+ assertTrue(channelHandler.listeningChannel.isOpen());
+ // Eventually the channel group of the server should contain the listening channel
+ assertEquals(channelHandler.channelGroup.size(), 1);
+ // Wait some time and check the channel was closed
+ TimeUnit.SECONDS.sleep(1);
+ assertFalse(channelF.getChannel().isOpen());
+
+ // ------------------------------------------------------------------------------------------------------------
+ // Test closeable interface with re-connection trial
+ // ------------------------------------------------------------------------------------------------------------
+ channelHandler.close();
+ assertFalse(channelHandler.listeningChannel.isOpen());
+ assertEquals(channelHandler.channelGroup.size(), 0);
+ } finally {
+ if (channelHandler != null) channelHandler.close();
+ }
- ClientBootstrap nettyClient = createNettyClientBootstrap();
-
- ChannelFuture channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
-
- // ------------------------------------------------------------------------------------------------------------
- // Test the client can't connect cause the server is not there
- // ------------------------------------------------------------------------------------------------------------
- while (!channelF.isDone()) /** do nothing */ ;
- assertFalse(channelF.isSuccess());
-
- // ------------------------------------------------------------------------------------------------------------
- // Test creation of a server connection
- // ------------------------------------------------------------------------------------------------------------
- channelHandler.reconnect();
- assertTrue(channelHandler.listeningChannel.isOpen());
- // Eventually the channel group of the server should contain the listening channel
- assertEquals(channelHandler.channelGroup.size(), 1);
-
- // ------------------------------------------------------------------------------------------------------------
- // Test that a client can connect now
- // ------------------------------------------------------------------------------------------------------------
- channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
- while (!channelF.isDone()) /** do nothing */ ;
- assertTrue(channelF.isSuccess());
- assertTrue(channelF.getChannel().isConnected());
- // Eventually the channel group of the server should have 2 elements
- while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
-
- // ------------------------------------------------------------------------------------------------------------
- // Close the channel on the client side and test we have one element less in the channel group
- // ------------------------------------------------------------------------------------------------------------
- channelF.getChannel().close().await();
- // Eventually the channel group of the server should have only one element
- while (channelHandler.channelGroup.size() != 1) /** do nothing */ ;
-
- // ------------------------------------------------------------------------------------------------------------
- // Open a new channel and test the connection closing on the server side through the channel handler
- // ------------------------------------------------------------------------------------------------------------
- channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
- while (!channelF.isDone()) /** do nothing */ ;
- assertTrue(channelF.isSuccess());
- // Eventually the channel group of the server should have 2 elements again
- while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
- channelHandler.closeConnection();
- assertFalse(channelHandler.listeningChannel.isOpen());
- assertEquals(channelHandler.channelGroup.size(), 0);
- // Wait some time and check the channel was closed
- TimeUnit.SECONDS.sleep(1);
- assertFalse(channelF.getChannel().isOpen());
-
- // ------------------------------------------------------------------------------------------------------------
- // Test server re-connections with connected clients
- // ------------------------------------------------------------------------------------------------------------
- // Connect first time
- channelHandler.reconnect();
- assertTrue(channelHandler.listeningChannel.isOpen());
- // Eventually the channel group of the server should contain the listening channel
- assertEquals(channelHandler.channelGroup.size(), 1);
- // Check the client can connect
- channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
- while (!channelF.isDone()) /** do nothing */ ;
- assertTrue(channelF.isSuccess());
- // Eventually the channel group of the server should have 2 elements
- while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
- // Re-connect and check that client connection was gone
- channelHandler.reconnect();
- assertTrue(channelHandler.listeningChannel.isOpen());
- // Eventually the channel group of the server should contain the listening channel
- assertEquals(channelHandler.channelGroup.size(), 1);
- // Wait some time and check the channel was closed
- TimeUnit.SECONDS.sleep(1);
- assertFalse(channelF.getChannel().isOpen());
-
- // ------------------------------------------------------------------------------------------------------------
- // Test closeable interface with re-connection trial
- // ------------------------------------------------------------------------------------------------------------
- channelHandler.close();
- assertFalse(channelHandler.listeningChannel.isOpen());
- assertEquals(channelHandler.channelGroup.size(), 0);
}
@Test(timeOut = 10_000)
public void testNettyChannelWriting() throws Exception {
+ int port = NetworkUtils.getFreePort();
+ TSOChannelHandler channelHandler = getTSOChannelHandler(port);
+ try {
+ // ------------------------------------------------------------------------------------------------------------
+ // Prepare test
+ // ------------------------------------------------------------------------------------------------------------
+
+ // Connect channel handler
+ channelHandler.reconnect();
+ // Create client and connect it
+ ClientBootstrap nettyClient = createNettyClientBootstrap();
+ ChannelFuture channelF = nettyClient.connect(new InetSocketAddress("localhost", port));
+ // Basic checks for connection
+ while (!channelF.isDone()) /** do nothing */ ;
+ assertTrue(channelF.isSuccess());
+ assertTrue(channelF.getChannel().isConnected());
+ Channel channel = channelF.getChannel();
+ // Eventually the channel group of the server should have 2 elements
+ while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
+ // Write first handshake request
+ TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
+ // NOTE: Add here the required handshake capabilities when necessary
+ handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
+ channelF.getChannel().write(TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
+
+ // ------------------------------------------------------------------------------------------------------------
+ // Test channel writing
+ // ------------------------------------------------------------------------------------------------------------
+ testWritingTimestampRequest(channel);
+
+ testWritingCommitRequest(channel);
+
+ testWritingFenceRequest(channel);
+ } finally {
+ if(channelHandler != null) channelHandler.close();
+ }
- // ------------------------------------------------------------------------------------------------------------
- // Prepare test
- // ------------------------------------------------------------------------------------------------------------
-
- // Connect channel handler
- channelHandler.reconnect();
- // Create client and connect it
- ClientBootstrap nettyClient = createNettyClientBootstrap();
- ChannelFuture channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
- // Basic checks for connection
- while (!channelF.isDone()) /** do nothing */ ;
- assertTrue(channelF.isSuccess());
- assertTrue(channelF.getChannel().isConnected());
- Channel channel = channelF.getChannel();
- // Eventually the channel group of the server should have 2 elements
- while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
- // Write first handshake request
- TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
- // NOTE: Add here the required handshake capabilities when necessary
- handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
- channelF.getChannel().write(TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
-
- // ------------------------------------------------------------------------------------------------------------
- // Test channel writing
- // ------------------------------------------------------------------------------------------------------------
- testWritingTimestampRequest(channel);
-
- testWritingCommitRequest(channel);
-
- testWritingFenceRequest(channel);
}
private void testWritingTimestampRequest(Channel channel) throws InterruptedException {
@@ -335,5 +351,4 @@ public class TestTSOChannelHandlerNetty {
});
return bootstrap;
}
-
}
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTSOLL.java b/tso-server/src/test/java/org/apache/omid/tso/TestTSOLL.java
index 99cc77e..c40dc84 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestTSOLL.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestTSOLL.java
@@ -18,6 +18,7 @@
package org.apache.omid.tso;
+import org.apache.omid.NetworkUtils;
import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
import com.google.inject.Guice;
@@ -46,7 +47,7 @@ public class TestTSOLL {
private static final Logger LOG = LoggerFactory.getLogger(TestTSOLL.class);
private static final String TSO_SERVER_HOST = "localhost";
- private static final int TSO_SERVER_PORT = 1234;
+ private int port;
private OmidClientConfiguration tsoClientConf;
@@ -70,7 +71,8 @@ public class TestTSOLL {
TSOServerConfig tsoConfig = new TSOServerConfig();
tsoConfig.setLowLatency(true);
tsoConfig.setConflictMapSize(1000);
- tsoConfig.setPort(TSO_SERVER_PORT);
+ port = NetworkUtils.getFreePort();
+ tsoConfig.setPort(port);
tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
tsoConfig.setNumConcurrentCTWriters(2);
Module tsoServerMockModule = new TSOMockModule(tsoConfig);
@@ -83,7 +85,7 @@ public class TestTSOLL {
tsoServer = injector.getInstance(TSOServer.class);
tsoServer.startAsync();
tsoServer.awaitRunning();
- TestUtils.waitForSocketListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 100);
+ TestUtils.waitForSocketListening(TSO_SERVER_HOST, port, 100);
LOG.info("==================================================================================================");
LOG.info("===================================== TSO Server Initialized =====================================");
@@ -93,7 +95,7 @@ public class TestTSOLL {
commitTable = injector.getInstance(CommitTable.class);
OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
- tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+ tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + port);
this.tsoClientConf = tsoClientConf;
commitTable = injector.getInstance(CommitTable.class);
@@ -107,7 +109,7 @@ public class TestTSOLL {
tsoServer.stopAsync();
tsoServer.awaitTerminated();
tsoServer = null;
- TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 1000);
+ TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, port, 1000);
pausableTSOracle.resume();
@@ -117,7 +119,7 @@ public class TestTSOLL {
public void testNoWriteToCommitTable() throws Exception {
TSOClient client = TSOClient.newInstance(tsoClientConf);
- TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
+ TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, port);
long ts1 = client.getNewStartTimestamp().get();
TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));