You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by ik...@apache.org on 2016/04/21 02:02:26 UTC
[09/52] [abbrv] incubator-omid git commit: Rename tsoclient package
to tso.client
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/tso-server/src/test/java/com/yahoo/omid/tso/client/TSOClientRaw.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/com/yahoo/omid/tso/client/TSOClientRaw.java b/tso-server/src/test/java/com/yahoo/omid/tso/client/TSOClientRaw.java
new file mode 100644
index 0000000..9a68c39
--- /dev/null
+++ b/tso-server/src/test/java/com/yahoo/omid/tso/client/TSOClientRaw.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.omid.tso.client;
+
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.yahoo.omid.proto.TSOProto;
+import com.yahoo.omid.proto.TSOProto.Response;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
+import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * Raw client for communicating with tso server directly with protobuf messages
+ */
+public class TSOClientRaw {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TSOClientRaw.class);
+
+ private final BlockingQueue<SettableFuture<Response>> responseQueue
+ = new ArrayBlockingQueue<SettableFuture<Response>>(5);
+ private final Channel channel;
+
+ public TSOClientRaw(String host, int port) throws InterruptedException, ExecutionException {
+ // Start client with Nb of active threads = 3 as maximum.
+ ChannelFactory factory = new NioClientSocketChannelFactory(
+ Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setNameFormat("tsoclient-boss-%d").build()),
+ Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build()), 3);
+ // Create the bootstrap
+ ClientBootstrap bootstrap = new ClientBootstrap(factory);
+
+ InetSocketAddress addr = new InetSocketAddress(host, port);
+
+ ChannelPipeline pipeline = bootstrap.getPipeline();
+ pipeline.addLast("lengthbaseddecoder",
+ new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
+ pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
+ pipeline.addLast("protobufdecoder",
+ new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
+ pipeline.addLast("protobufencoder", new ProtobufEncoder());
+
+ Handler handler = new Handler();
+ pipeline.addLast("handler", handler);
+
+ bootstrap.setOption("tcpNoDelay", true);
+ bootstrap.setOption("keepAlive", true);
+ bootstrap.setOption("reuseAddress", true);
+ bootstrap.setOption("connectTimeoutMillis", 100);
+
+ ChannelFuture channelFuture = bootstrap.connect(addr).await();
+ channel = channelFuture.getChannel();
+ }
+
+ public void write(TSOProto.Request request) {
+ channel.write(request);
+ }
+
+ public Future<Response> getResponse() throws InterruptedException {
+ SettableFuture<Response> future = SettableFuture.<Response>create();
+ responseQueue.put(future);
+ return future;
+ }
+
+ public void close() throws InterruptedException {
+ responseQueue.put(SettableFuture.<Response>create());
+ channel.close();
+ }
+
+ private class Handler extends SimpleChannelHandler {
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ LOG.info("Message received", e);
+ if (e.getMessage() instanceof Response) {
+ Response resp = (Response) e.getMessage();
+ try {
+ SettableFuture<Response> future = responseQueue.take();
+ future.set(resp);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted in handler", ie);
+ }
+ } else {
+ LOG.warn("Received unknown message", e.getMessage());
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ LOG.info("Exception received", e.getCause());
+ try {
+ SettableFuture<Response> future = responseQueue.take();
+ future.setException(e.getCause());
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted handling exception", ie);
+ }
+ }
+
+ @Override
+ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
+ throws Exception {
+ LOG.info("Disconnected");
+ try {
+ SettableFuture<Response> future = responseQueue.take();
+ future.setException(new ConnectionException());
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted handling exception", ie);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/tso-server/src/test/java/com/yahoo/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/com/yahoo/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java b/tso-server/src/test/java/com/yahoo/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
new file mode 100644
index 0000000..c89d05e
--- /dev/null
+++ b/tso-server/src/test/java/com/yahoo/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.omid.tso.client;
+
+import com.google.common.collect.Sets;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.yahoo.omid.TestUtils;
+import com.yahoo.omid.committable.CommitTable;
+import com.yahoo.omid.tso.TSOMockModule;
+import com.yahoo.omid.tso.TSOServer;
+import com.yahoo.omid.tso.TSOServerConfig;
+import com.yahoo.omid.tso.util.DummyCellIdImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.assertNotNull;
+
+public class TestIntegrationOfTSOClientServerBasicFunctionality {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestIntegrationOfTSOClientServerBasicFunctionality.class);
+
+ private static final String TSO_SERVER_HOST = "localhost";
+ private int tsoServerPortForTest;
+
+ // Cells for tests
+ private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL);
+ private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL);
+
+ // Required infrastructure for TSO tsoClient-server integration testing
+ private TSOServer tsoServer;
+ private TSOClient tsoClient;
+ private TSOClient justAnotherTSOClient;
+ private CommitTable.Client commitTableClient;
+
+ @BeforeClass
+ public void setup() throws Exception {
+
+ tsoServerPortForTest = TestUtils.getFreeLocalPort();
+
+ TSOServerConfig tsoConfig = new TSOServerConfig();
+ tsoConfig.setMaxItems(1000);
+ tsoConfig.setPort(tsoServerPortForTest);
+ Module tsoServerMockModule = new TSOMockModule(tsoConfig);
+ Injector injector = Guice.createInjector(tsoServerMockModule);
+
+ CommitTable commitTable = injector.getInstance(CommitTable.class);
+ commitTableClient = commitTable.getClient();
+
+ LOG.info("==================================================================================================");
+ LOG.info("======================================= Init TSO Server ==========================================");
+ LOG.info("==================================================================================================");
+
+ tsoServer = injector.getInstance(TSOServer.class);
+ tsoServer.startAndWait();
+ TestUtils.waitForSocketListening(TSO_SERVER_HOST, tsoServerPortForTest, 100);
+
+ LOG.info("==================================================================================================");
+ LOG.info("===================================== TSO Server Initialized =====================================");
+ LOG.info("==================================================================================================");
+
+ LOG.info("==================================================================================================");
+ LOG.info("======================================= Setup TSO Clients ========================================");
+ LOG.info("==================================================================================================");
+
+ // Configure direct connection to the server
+ OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+ tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + tsoServerPortForTest);
+
+ tsoClient = TSOClient.newInstance(tsoClientConf);
+ justAnotherTSOClient = TSOClient.newInstance(tsoClientConf);
+
+ LOG.info("==================================================================================================");
+ LOG.info("===================================== TSO Clients Initialized ====================================");
+ LOG.info("==================================================================================================");
+
+ Thread.currentThread().setName("Test Thread");
+
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception {
+
+ tsoClient.close().get();
+
+ tsoServer.stopAndWait();
+ tsoServer = null;
+ TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, tsoServerPortForTest, 1000);
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testTimestampsOrderingGrowMonotonically() throws Exception {
+ long referenceTimestamp;
+ long startTsTx1 = tsoClient.getNewStartTimestamp().get();
+ referenceTimestamp = startTsTx1;
+
+ long startTsTx2 = tsoClient.getNewStartTimestamp().get();
+ assertEquals(startTsTx2, ++referenceTimestamp, "Should grow monotonically");
+ assertTrue(startTsTx2 > startTsTx1, "Two timestamps obtained consecutively should grow");
+
+ long commitTsTx2 = tsoClient.commit(startTsTx2, Sets.newHashSet(c1)).get();
+ assertEquals(commitTsTx2, ++referenceTimestamp, "Should grow monotonically");
+
+ long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c2)).get();
+ assertEquals(commitTsTx1, ++referenceTimestamp, "Should grow monotonically");
+
+ long startTsTx3 = tsoClient.getNewStartTimestamp().get();
+ assertEquals(startTsTx3, ++referenceTimestamp, "Should grow monotonically");
+ }
+
+ @Test(timeOut = 30_000)
+ public void testSimpleTransactionWithNoWriteSetCanCommit() throws Exception {
+ long startTsTx1 = tsoClient.getNewStartTimestamp().get();
+ long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.<CellId>newHashSet()).get();
+ assertTrue(commitTsTx1 > startTsTx1);
+ }
+
+ @Test(timeOut = 30_000)
+ public void testTransactionWithMassiveWriteSetCanCommit() throws Exception {
+ long startTs = tsoClient.getNewStartTimestamp().get();
+
+ Set<CellId> cells = new HashSet<>();
+ for (int i = 0; i < 1_000_000; i++) {
+ cells.add(new DummyCellIdImpl(i));
+ }
+
+ long commitTs = tsoClient.commit(startTs, cells).get();
+ assertTrue(commitTs > startTs, "Commit TS should be higher than Start TS");
+ }
+
+ @Test(timeOut = 30_000)
+ public void testMultipleSerialCommitsDoNotConflict() throws Exception {
+ long startTsTx1 = tsoClient.getNewStartTimestamp().get();
+ long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c1)).get();
+ assertTrue(commitTsTx1 > startTsTx1, "Commit TS must be greater than Start TS");
+
+ long startTsTx2 = tsoClient.getNewStartTimestamp().get();
+ assertTrue(startTsTx2 > commitTsTx1, "TS should grow monotonically");
+
+ long commitTsTx2 = tsoClient.commit(startTsTx2, Sets.newHashSet(c1, c2)).get();
+ assertTrue(commitTsTx2 > startTsTx2, "Commit TS must be greater than Start TS");
+
+ long startTsTx3 = tsoClient.getNewStartTimestamp().get();
+ long commitTsTx3 = tsoClient.commit(startTsTx3, Sets.newHashSet(c2)).get();
+ assertTrue(commitTsTx3 > startTsTx3, "Commit TS must be greater than Start TS");
+ }
+
+ @Test(timeOut = 30_000)
+ public void testCommitWritesToCommitTable() throws Exception {
+ long startTsForTx1 = tsoClient.getNewStartTimestamp().get();
+ long startTsForTx2 = tsoClient.getNewStartTimestamp().get();
+ assertTrue(startTsForTx2 > startTsForTx1, "Start TS should grow");
+
+ assertFalse(commitTableClient.getCommitTimestamp(startTsForTx1).get().isPresent(),
+ "Commit TS for Tx1 shouldn't appear in Commit Table");
+
+ long commitTsForTx1 = tsoClient.commit(startTsForTx1, Sets.newHashSet(c1)).get();
+ assertTrue(commitTsForTx1 > startTsForTx1, "Commit TS should be higher than Start TS for the same tx");
+
+ Long commitTs1InCommitTable = commitTableClient.getCommitTimestamp(startTsForTx1).get().get().getValue();
+ assertNotNull("Tx is committed, should return as such from Commit Table", commitTs1InCommitTable);
+ assertEquals(commitTsForTx1, (long) commitTs1InCommitTable,
+ "getCommitTimestamp() & commit() should report same Commit TS value for same tx");
+ assertTrue(commitTs1InCommitTable > startTsForTx2, "Commit TS should be higher than tx's Start TS");
+ }
+
+ @Test(timeOut = 30_000)
+ public void testTwoConcurrentTxWithOverlappingWritesetsHaveConflicts() throws Exception {
+ long startTsTx1 = tsoClient.getNewStartTimestamp().get();
+ long startTsTx2 = tsoClient.getNewStartTimestamp().get();
+ assertTrue(startTsTx2 > startTsTx1, "Second TX should have higher TS");
+
+ long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c1)).get();
+ assertTrue(commitTsTx1 > startTsTx1, "Commit TS must be higher than Start TS for the same tx");
+
+ try {
+ tsoClient.commit(startTsTx2, Sets.newHashSet(c1, c2)).get();
+ Assert.fail("Second TX should fail on commit");
+ } catch (ExecutionException ee) {
+ assertEquals(AbortException.class, ee.getCause().getClass(), "Should have aborted");
+ }
+ }
+
+ @Test(timeOut = 30_000)
+ public void testConflictsAndMonotonicallyTimestampGrowthWithTwoDifferentTSOClients() throws Exception {
+ long startTsTx1Client1 = tsoClient.getNewStartTimestamp().get();
+ long startTsTx2Client1 = tsoClient.getNewStartTimestamp().get();
+ long startTsTx3Client1 = tsoClient.getNewStartTimestamp().get();
+
+ tsoClient.commit(startTsTx1Client1, Sets.newHashSet(c1)).get();
+ try {
+ tsoClient.commit(startTsTx3Client1, Sets.newHashSet(c1, c2)).get();
+ Assert.fail("Second commit should fail as conflicts with the previous concurrent one");
+ } catch (ExecutionException ee) {
+ assertEquals(AbortException.class, ee.getCause().getClass(), "Should have aborted");
+ }
+ long startTsTx4Client2 = justAnotherTSOClient.getNewStartTimestamp().get();
+
+ assertFalse(commitTableClient.getCommitTimestamp(startTsTx3Client1).get().isPresent(), "Tx3 didn't commit");
+ long commitTSTx1 = commitTableClient.getCommitTimestamp(startTsTx1Client1).get().get().getValue();
+ assertTrue(commitTSTx1 > startTsTx2Client1, "Tx1 committed after Tx2 started");
+ assertTrue(commitTSTx1 < startTsTx4Client2, "Tx1 committed before Tx4 started on the other TSO Client");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/tso-server/src/test/java/com/yahoo/omid/tso/client/TestTSOClientConnectionToTSO.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/com/yahoo/omid/tso/client/TestTSOClientConnectionToTSO.java b/tso-server/src/test/java/com/yahoo/omid/tso/client/TestTSOClientConnectionToTSO.java
new file mode 100644
index 0000000..581e5ce
--- /dev/null
+++ b/tso-server/src/test/java/com/yahoo/omid/tso/client/TestTSOClientConnectionToTSO.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.omid.tso.client;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.yahoo.omid.TestUtils;
+import com.yahoo.omid.tso.HALeaseManagementModule;
+import com.yahoo.omid.tso.TSOMockModule;
+import com.yahoo.omid.tso.TSOServer;
+import com.yahoo.omid.tso.TSOServerConfig;
+import com.yahoo.omid.tso.VoidLeaseManagementModule;
+import com.yahoo.statemachine.StateMachine.FsmImpl;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.ExecutionException;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+public class TestTSOClientConnectionToTSO {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientConnectionToTSO.class);
+
+ // Constants and variables for component connectivity
+ private static final String TSO_HOST = "localhost";
+ private static final String CURRENT_TSO_PATH = "/current_tso_path";
+ private static final String TSO_LEASE_PATH = "/tso_lease_path";
+
+ private int tsoPortForTest;
+ private String zkClusterForTest;
+
+ private Injector injector = null;
+
+ private TestingServer zkServer;
+
+ private CuratorFramework zkClient;
+ private TSOServer tsoServer;
+
+ @BeforeMethod
+ public void beforeMethod() throws Exception {
+
+ tsoPortForTest = TestUtils.getFreeLocalPort();
+
+ int zkPortForTest = TestUtils.getFreeLocalPort();
+ zkClusterForTest = TSO_HOST + ":" + zkPortForTest;
+ LOG.info("Starting ZK Server in port {}", zkPortForTest);
+ zkServer = TestUtils.provideTestingZKServer(zkPortForTest);
+ LOG.info("ZK Server Started @ {}", zkServer.getConnectString());
+
+ zkClient = TestUtils.provideConnectedZKClient(zkClusterForTest);
+
+ Stat stat;
+ try {
+ zkClient.delete().forPath(CURRENT_TSO_PATH);
+ stat = zkClient.checkExists().forPath(CURRENT_TSO_PATH);
+ assertNull(stat, CURRENT_TSO_PATH + " should not exist");
+ } catch (NoNodeException e) {
+ LOG.info("{} ZNode did not exist", CURRENT_TSO_PATH);
+ }
+
+ }
+
+ @AfterMethod
+ public void afterMethod() {
+
+ zkClient.close();
+
+ CloseableUtils.closeQuietly(zkServer);
+ zkServer = null;
+ LOG.info("ZK Server Stopped");
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testUnsuccessfulConnectionToTSO() throws Exception {
+
+ // When no HA node for TSOServer is found & no host:port config exists
+ // we should get an exception when getting the client
+ try {
+ TSOClient.newInstance(new OmidClientConfiguration());
+ } catch (IllegalArgumentException e) {
+ // Expected
+ }
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testSuccessfulConnectionToTSOWithHostAndPort() throws Exception {
+
+ // Launch a TSO WITHOUT publishing the address in HA...
+ TSOServerConfig tsoConfig = new TSOServerConfig();
+ tsoConfig.setMaxItems(1000);
+ tsoConfig.setPort(tsoPortForTest);
+ tsoConfig.setLeaseModule(new VoidLeaseManagementModule());
+ injector = Guice.createInjector(new TSOMockModule(tsoConfig));
+ LOG.info("Starting TSO");
+ tsoServer = injector.getInstance(TSOServer.class);
+ tsoServer.startAndWait();
+ TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
+ LOG.info("Finished loading TSO");
+
+ // When no HA node for TSOServer is found we should get a connection
+ // to the TSO through the host:port configured...
+ OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+ tsoClientConf.setConnectionString("localhost:" + tsoPortForTest);
+ tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
+ TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
+
+ // ... so we should get responses from the methods
+ Long startTS = tsoClient.getNewStartTimestamp().get();
+ LOG.info("Start TS {} ", startTS);
+ assertEquals(startTS.longValue(), 1);
+
+ // Close the tsoClient connection and stop the TSO Server
+ tsoClient.close().get();
+ tsoServer.stopAndWait();
+ tsoServer = null;
+ TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
+ LOG.info("TSO Server Stopped");
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testSuccessfulConnectionToTSOThroughZK() throws Exception {
+
+ // Launch a TSO publishing the address in HA...
+ TSOServerConfig config = new TSOServerConfig();
+ config.setMaxItems(1000);
+ config.setPort(tsoPortForTest);
+ config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid"));
+ injector = Guice.createInjector(new TSOMockModule(config));
+ LOG.info("Starting TSO");
+ tsoServer = injector.getInstance(TSOServer.class);
+ tsoServer.startAndWait();
+ TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
+ LOG.info("Finished loading TSO");
+
+ waitTillTsoRegisters(injector.getInstance(CuratorFramework.class));
+
+ // When a HA node for TSOServer is found we should get a connection
+ OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+ tsoClientConf.setConnectionType(OmidClientConfiguration.ConnType.HA);
+ tsoClientConf.setConnectionString(zkClusterForTest);
+ tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
+ TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
+
+ // ... so we should get responses from the methods
+ Long startTS = tsoClient.getNewStartTimestamp().get();
+ LOG.info("Start TS {} ", startTS);
+ assertEquals(startTS.longValue(), 1);
+
+ // Close the tsoClient connection and stop the TSO Server
+ tsoClient.close().get();
+ tsoServer.stopAndWait();
+ tsoServer = null;
+ TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
+ LOG.info("TSO Server Stopped");
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testSuccessOfTSOClientReconnectionsToARestartedTSOWithZKPublishing() throws Exception {
+
+ // Start a TSO with HA...
+ TSOServerConfig config = new TSOServerConfig();
+ config.setMaxItems(1000);
+ config.setPort(tsoPortForTest);
+ config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid"));
+ injector = Guice.createInjector(new TSOMockModule(config));
+ LOG.info("Starting Initial TSO");
+ tsoServer = injector.getInstance(TSOServer.class);
+ tsoServer.startAndWait();
+ TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
+ LOG.info("Finished loading TSO");
+
+ waitTillTsoRegisters(injector.getInstance(CuratorFramework.class));
+
+ // Then create the TSO Client under test...
+ OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+ tsoClientConf.setConnectionType(OmidClientConfiguration.ConnType.HA);
+ tsoClientConf.setConnectionString(zkClusterForTest);
+ tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
+ TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
+
+ // ... and check that initially we get responses from the methods
+ Long startTS = tsoClient.getNewStartTimestamp().get();
+ LOG.info("Start TS {} ", startTS);
+ assertEquals(startTS.longValue(), 1);
+
+ // Then stop the server...
+ tsoServer.stopAndWait();
+ tsoServer = null;
+ TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
+ LOG.info("Initial TSO Server Stopped");
+
+ Thread.sleep(1500); // ...allow the client to receive disconnection event...
+ // ... and check that we get a conn exception when trying to access the client
+ try {
+ startTS = tsoClient.getNewStartTimestamp().get();
+ fail();
+ } catch (ExecutionException e) {
+ LOG.info("Exception expected");
+ // Internal accessor to fsm to do the required checkings
+ FsmImpl fsm = (FsmImpl) tsoClient.fsm;
+ assertEquals(e.getCause().getClass(), ConnectionException.class);
+ assertTrue(fsm.getState().getClass().equals(TSOClient.ConnectionFailedState.class)
+ ||
+ fsm.getState().getClass().equals(TSOClient.DisconnectedState.class));
+ }
+
+ // After that, simulate that a new TSO has been launched...
+ Injector newInjector = Guice.createInjector(new TSOMockModule(config));
+ LOG.info("Re-Starting again the TSO");
+ tsoServer = newInjector.getInstance(TSOServer.class);
+ tsoServer.startAndWait();
+ TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
+ LOG.info("Finished loading restarted TSO");
+
+ // Finally re-check that, eventually, we can get a new value from the new TSO...
+ boolean reconnectionActive = false;
+ while (!reconnectionActive) {
+ try {
+ startTS = tsoClient.getNewStartTimestamp().get();
+ reconnectionActive = true;
+ } catch (ExecutionException e) {
+ // Expected
+ }
+ }
+ assertNotNull(startTS);
+
+ // ...and stop the server
+ tsoServer.stopAndWait();
+ TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
+ LOG.info("Restarted TSO Server Stopped");
+ }
+
+ private void waitTillTsoRegisters(CuratorFramework zkClient) throws Exception {
+ while (true) {
+ try {
+ Stat stat = zkClient.checkExists().forPath(CURRENT_TSO_PATH);
+ if (stat == null) {
+ continue;
+ }
+ LOG.info("TSO registered in HA with path {}={}", CURRENT_TSO_PATH, stat.toString());
+ if (stat.toString().length() == 0) {
+ continue;
+ }
+ return;
+ } catch (Exception e) {
+ LOG.debug("TSO still has not registered yet, sleeping...", e);
+ Thread.sleep(500);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/tso-server/src/test/java/com/yahoo/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/com/yahoo/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java b/tso-server/src/test/java/com/yahoo/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
new file mode 100644
index 0000000..0e15261
--- /dev/null
+++ b/tso-server/src/test/java/com/yahoo/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.omid.tso.client;
+
+import com.google.common.collect.Sets;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.yahoo.omid.TestUtils;
+import com.yahoo.omid.committable.CommitTable;
+import com.yahoo.omid.proto.TSOProto;
+import com.yahoo.omid.tso.PausableTimestampOracle;
+import com.yahoo.omid.tso.TSOMockModule;
+import com.yahoo.omid.tso.TSOServer;
+import com.yahoo.omid.tso.TSOServerConfig;
+import com.yahoo.omid.tso.TimestampOracle;
+import com.yahoo.omid.tso.util.DummyCellIdImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+public class TestTSOClientRequestAndResponseBehaviours {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientRequestAndResponseBehaviours.class);
+
+ private static final String TSO_SERVER_HOST = "localhost";
+ private static final int TSO_SERVER_PORT = 1234;
+
+ private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL);
+ private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL);
+
+ private final static Set<CellId> testWriteSet = Sets.newHashSet(c1, c2);
+
+ private OmidClientConfiguration tsoClientConf;
+
+ // Required infrastructure for TSOClient test
+ private TSOServer tsoServer;
+ private PausableTimestampOracle pausableTSOracle;
+ private CommitTable commitTable;
+
+ @BeforeClass
+ public void setup() throws Exception {
+
+ TSOServerConfig tsoConfig = new TSOServerConfig();
+ tsoConfig.setMaxItems(1000);
+ tsoConfig.setPort(TSO_SERVER_PORT);
+ Module tsoServerMockModule = new TSOMockModule(tsoConfig);
+ Injector injector = Guice.createInjector(tsoServerMockModule);
+
+ LOG.info("==================================================================================================");
+ LOG.info("======================================= Init TSO Server ==========================================");
+ LOG.info("==================================================================================================");
+
+ tsoServer = injector.getInstance(TSOServer.class);
+ tsoServer.startAndWait();
+ TestUtils.waitForSocketListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 100);
+
+ LOG.info("==================================================================================================");
+ LOG.info("===================================== TSO Server Initialized =====================================");
+ LOG.info("==================================================================================================");
+
+ pausableTSOracle = (PausableTimestampOracle) injector.getInstance(TimestampOracle.class);
+ commitTable = injector.getInstance(CommitTable.class);
+
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception {
+
+ tsoServer.stopAndWait();
+ tsoServer = null;
+ TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 1000);
+
+ }
+
+ @BeforeMethod
+ public void beforeMethod() {
+ OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+ tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+
+ this.tsoClientConf = tsoClientConf;
+
+ }
+
+ @AfterMethod
+ public void afterMethod() {
+
+ pausableTSOracle.resume();
+
+ }
+
+ /**
+ * Test to ensure TSOClient timeouts are cancelled.
+ * At some point a bug was detected because the TSOClient timeouts were not cancelled, and as timestamp requests
+ * had no way to be correlated to timestamp responses, random requests were just timed out after a certain time.
+ * We send a lot of timestamp requests, and wait for them to complete.
+ * Ensure that the next request doesn't get hit by the timeouts of the previous
+ * requests. (i.e. make sure we cancel timeouts)
+ */
+ @Test(timeOut = 30_000)
+ public void testTimeoutsAreCancelled() throws Exception {
+
+ TSOClient client = TSOClient.newInstance(tsoClientConf);
+ int requestTimeoutInMs = 500;
+ int requestMaxRetries = 5;
+ LOG.info("Request timeout {} ms; Max retries {}", requestTimeoutInMs, requestMaxRetries);
+ Future<Long> f = null;
+ for (int i = 0; i < (requestMaxRetries * 10); i++) {
+ f = client.getNewStartTimestamp();
+ }
+ if (f != null) {
+ f.get();
+ }
+ pausableTSOracle.pause();
+ long msToSleep = ((long) (requestTimeoutInMs * 0.75));
+ LOG.info("Sleeping for {} ms", msToSleep);
+ TimeUnit.MILLISECONDS.sleep(msToSleep);
+ f = client.getNewStartTimestamp();
+ msToSleep = ((long) (requestTimeoutInMs * 0.9));
+ LOG.info("Sleeping for {} ms", msToSleep);
+ TimeUnit.MILLISECONDS.sleep(msToSleep);
+ LOG.info("Resuming");
+ pausableTSOracle.resume();
+ f.get();
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testCommitGetsServiceUnavailableExceptionWhenCommunicationFails() throws Exception {
+
+ OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
+ testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+ testTSOClientConf.setRequestMaxRetries(0);
+ TSOClient client = TSOClient.newInstance(testTSOClientConf);
+
+ List<Long> startTimestamps = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ startTimestamps.add(client.getNewStartTimestamp().get());
+ }
+
+ pausableTSOracle.pause();
+
+ List<Future<Long>> futures = new ArrayList<>();
+ for (long s : startTimestamps) {
+ futures.add(client.commit(s, Sets.<CellId>newHashSet()));
+ }
+ TSOClientAccessor.closeChannel(client);
+
+ for (Future<Long> f : futures) {
+ try {
+ f.get();
+ fail("Shouldn't be able to complete");
+ } catch (ExecutionException ee) {
+ assertTrue(ee.getCause() instanceof ServiceUnavailableException,
+ "Should be a service unavailable exception");
+ }
+ }
+ }
+
+ /**
+ * Test that if a client tries to make a request without handshaking, it will be disconnected.
+ */
+ @Test(timeOut = 30_000)
+ public void testHandshakeBetweenOldClientAndCurrentServer() throws Exception {
+
+ TSOClientRaw raw = new TSOClientRaw(TSO_SERVER_HOST, TSO_SERVER_PORT);
+
+ TSOProto.Request request = TSOProto.Request.newBuilder()
+ .setTimestampRequest(TSOProto.TimestampRequest.newBuilder().build())
+ .build();
+ raw.write(request);
+ try {
+ raw.getResponse().get();
+ fail("Channel should be closed");
+ } catch (ExecutionException ee) {
+ assertEquals(ee.getCause().getClass(), ConnectionException.class, "Should be channel closed exception");
+ }
+ raw.close();
+
+ }
+
+ // ----------------------------------------------------------------------------------------------------------------
+ // Test duplicate commits
+ // ----------------------------------------------------------------------------------------------------------------
+
+ /**
+ * This tests the case where messages arrive at the TSO out of order. This can happen in the case
+ * the channel get dropped and the retry is done in a new channel. However, the TSO will respond with
+ * aborted to the original message because the retry was already committed and it would be prohibitively
+ * expensive to check all non-retry requests to see if they are already committed. For this reason
+ * a client must ensure that if it is sending a retry due to a socket error, the previous channel
+ * must be entirely closed so that it will not actually receive the abort response. TCP guarantees
+ * that this doesn't happen in non-socket error cases.
+ *
+ */
+ @Test(timeOut = 30_000)
+ public void testOutOfOrderMessages() throws Exception {
+
+ TSOClient client = TSOClient.newInstance(tsoClientConf);
+ TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
+
+ long ts1 = client.getNewStartTimestamp().get();
+
+ TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
+ TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
+ assertFalse(response1.getCommitResponse().getAborted(), "Retry Transaction should commit");
+ assertTrue(response2.getCommitResponse().getAborted(), "Transaction should abort");
+ }
+
+ @Test(timeOut = 30_000)
+ public void testDuplicateCommitAborting() throws Exception {
+
+ TSOClient client = TSOClient.newInstance(tsoClientConf);
+ TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
+
+ long ts1 = client.getNewStartTimestamp().get();
+ long ts2 = client.getNewStartTimestamp().get();
+ client.commit(ts2, testWriteSet).get();
+
+ TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
+ TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
+ assertTrue(response1.getCommitResponse().getAborted(), "Transaction should abort");
+ assertTrue(response2.getCommitResponse().getAborted(), "Retry commit should abort");
+ }
+
+ @Test(timeOut = 30_000)
+ public void testDuplicateCommit() throws Exception {
+
+ TSOClient client = TSOClient.newInstance(tsoClientConf);
+ TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
+
+ long ts1 = client.getNewStartTimestamp().get();
+
+ TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
+ TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
+ assertEquals(response2.getCommitResponse().getCommitTimestamp(),
+ response1.getCommitResponse().getCommitTimestamp(),
+ "Commit timestamp should be the same");
+ }
+
+ // ----------------------------------------------------------------------------------------------------------------
+ // Test TSOClient retry behaviour
+ // ----------------------------------------------------------------------------------------------------------------
+
+ @Test(timeOut = 30_000)
+ public void testCommitCanSucceedWhenChannelDisconnected() throws Exception {
+
+ TSOClient client = TSOClient.newInstance(tsoClientConf);
+
+ long ts1 = client.getNewStartTimestamp().get();
+ pausableTSOracle.pause();
+ TSOFuture<Long> future = client.commit(ts1, testWriteSet);
+ TSOClientAccessor.closeChannel(client);
+ pausableTSOracle.resume();
+ future.get();
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testCommitCanSucceedWithMultipleTimeouts() throws Exception {
+
+ OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
+ testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+ testTSOClientConf.setRequestTimeoutInMs(100);
+ testTSOClientConf.setRequestMaxRetries(10000);
+ TSOClient client = TSOClient.newInstance(testTSOClientConf);
+
+ long ts1 = client.getNewStartTimestamp().get();
+ pausableTSOracle.pause();
+ TSOFuture<Long> future = client.commit(ts1, testWriteSet);
+ TimeUnit.SECONDS.sleep(1);
+ pausableTSOracle.resume();
+ future.get();
+ }
+
+ @Test(timeOut = 30_000)
+ public void testCommitFailWhenTSOIsDown() throws Exception {
+
+ OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
+ testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+ testTSOClientConf.setRequestTimeoutInMs(100);
+ testTSOClientConf.setRequestMaxRetries(10);
+ TSOClient client = TSOClient.newInstance(testTSOClientConf);
+
+ long ts1 = client.getNewStartTimestamp().get();
+ pausableTSOracle.pause();
+ TSOFuture<Long> future = client.commit(ts1, testWriteSet);
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ assertEquals(e.getCause().getClass(), ServiceUnavailableException.class,
+ "Should be a ServiceUnavailableExeption");
+ }
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testTimestampRequestSucceedWithMultipleTimeouts() throws Exception {
+
+ OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
+ testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+ testTSOClientConf.setRequestTimeoutInMs(100);
+ testTSOClientConf.setRequestMaxRetries(10000);
+ TSOClient client = TSOClient.newInstance(testTSOClientConf);
+
+ pausableTSOracle.pause();
+ Future<Long> future = client.getNewStartTimestamp();
+ TimeUnit.SECONDS.sleep(1);
+ pausableTSOracle.resume();
+ future.get();
+
+ }
+
+ // ----------------------------------------------------------------------------------------------------------------
+ // The next 3 tests are similar to the ones in TestRetryProcessor but checking the result on the TSOClient side
+ // (They exercise the communication protocol) TODO Remove???
+ // ----------------------------------------------------------------------------------------------------------------
+ @Test
+ public void testCommitTimestampPresentInCommitTableReturnsCommit() throws Exception {
+
+ TSOClient client = TSOClient.newInstance(tsoClientConf);
+ TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
+
+ long tx1ST = client.getNewStartTimestamp().get();
+
+ clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
+ TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
+ assertFalse(response.getCommitResponse().getAborted(), "Transaction should be committed");
+ assertFalse(response.getCommitResponse().getMakeHeuristicDecision());
+ assertEquals(response.getCommitResponse().getCommitTimestamp(), tx1ST + 1);
+ }
+
+ @Test
+ public void testInvalidCommitTimestampPresentInCommitTableReturnsAbort() throws Exception {
+
+ TSOClient client = TSOClient.newInstance(tsoClientConf);
+ TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
+
+ long tx1ST = client.getNewStartTimestamp().get();
+ // Invalidate the transaction
+ commitTable.getClient().tryInvalidateTransaction(tx1ST);
+
+ clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
+ TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
+ assertTrue(response.getCommitResponse().getAborted(), "Transaction should be aborted");
+ assertFalse(response.getCommitResponse().getMakeHeuristicDecision());
+ assertEquals(response.getCommitResponse().getCommitTimestamp(), 0);
+ }
+
+ @Test
+ public void testCommitTimestampNotPresentInCommitTableReturnsAnAbort() throws Exception {
+
+ TSOClient client = TSOClient.newInstance(tsoClientConf);
+ TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
+
+ long tx1ST = client.getNewStartTimestamp().get();
+
+ clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
+
+ // Simulate remove entry from the commit table before exercise retry
+ commitTable.getClient().completeTransaction(tx1ST);
+
+ TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
+ assertTrue(response.getCommitResponse().getAborted(), "Transaction should abort");
+ assertFalse(response.getCommitResponse().getMakeHeuristicDecision());
+ assertEquals(response.getCommitResponse().getCommitTimestamp(), 0);
+ }
+ // ----------------------------------------------------------------------------------------------------------------
+ // The previous 3 tests are similar to the ones in TestRetryProcessor but checking the result on the TSOClient side
+ // (They exercise the communication protocol) TODO Remove???
+ // ----------------------------------------------------------------------------------------------------------------
+
+ // ----------------------------------------------------------------------------------------------------------------
+ // Helper methods
+ // ----------------------------------------------------------------------------------------------------------------
+
+ private TSOProto.Request createRetryCommitRequest(long ts) {
+ return createCommitRequest(ts, true, testWriteSet);
+ }
+
+ private TSOProto.Request createCommitRequest(long ts, boolean retry, Set<CellId> writeSet) {
+ TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
+ TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder();
+ commitBuilder.setStartTimestamp(ts);
+ commitBuilder.setIsRetry(retry);
+ for (CellId cell : writeSet) {
+ commitBuilder.addCellId(cell.getCellId());
+ }
+ return builder.setCommitRequest(commitBuilder.build()).build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/tso-server/src/test/java/com/yahoo/omid/tso/client/TestTSOClientResponseHandling.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/com/yahoo/omid/tso/client/TestTSOClientResponseHandling.java b/tso-server/src/test/java/com/yahoo/omid/tso/client/TestTSOClientResponseHandling.java
new file mode 100644
index 0000000..3c2aa1e
--- /dev/null
+++ b/tso-server/src/test/java/com/yahoo/omid/tso/client/TestTSOClientResponseHandling.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.omid.tso.client;
+
+import com.yahoo.omid.tso.ProgrammableTSOServer;
+import com.yahoo.omid.tso.ProgrammableTSOServer.AbortResponse;
+import com.yahoo.omid.tso.ProgrammableTSOServer.CommitResponse;
+import com.yahoo.omid.tso.ProgrammableTSOServer.TimestampResponse;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+
+import static org.testng.Assert.assertEquals;
+
+public class TestTSOClientResponseHandling {
+
+ private static final int TSO_PORT = 4321;
+ private static final long START_TS = 1L;
+ private static final long COMMIT_TS = 2L;
+
+ private ProgrammableTSOServer tsoServer = new ProgrammableTSOServer(TSO_PORT);
+ // Client under test
+ private TSOClient tsoClient;
+
+ @BeforeClass
+ public void configureAndCreateClient() throws IOException, InterruptedException {
+
+ OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+ tsoClientConf.setConnectionString("localhost:" + TSO_PORT);
+ tsoClient = TSOClient.newInstance(tsoClientConf);
+ }
+
+ @BeforeMethod
+ public void reset() {
+ tsoServer.cleanResponses();
+ }
+
+ @Test
+ public void testTimestampRequestReceivingASuccessfulResponse() throws Exception {
+ // test request timestamp response returns a timestamp
+
+ // Program the TSO to return an ad-hoc Timestamp response
+ tsoServer.queueResponse(new TimestampResponse(START_TS));
+
+ long startTS = tsoClient.getNewStartTimestamp().get();
+ assertEquals(startTS, START_TS);
+ }
+
+ @Test
+ public void testCommitRequestReceivingAnAbortResponse() throws Exception {
+ // test commit request which is aborted on the server side
+ // (e.g. due to conflicts with other transaction) throws an
+ // execution exception with an AbortException as a cause
+
+ // Program the TSO to return an Abort response
+ tsoServer.queueResponse(new AbortResponse(START_TS));
+
+ try {
+ tsoClient.commit(START_TS, Collections.<CellId>emptySet()).get();
+ } catch (ExecutionException ee) {
+ assertEquals(ee.getCause().getClass(), AbortException.class);
+ }
+ }
+
+ @Test
+ public void testCommitRequestReceivingASuccessfulResponse() throws Exception {
+ // test commit request which is successfully committed on the server
+ // side returns a commit timestamp
+
+ // Program the TSO to return an Commit response (with no required heuristic actions)
+ tsoServer.queueResponse(new CommitResponse(false, START_TS, COMMIT_TS));
+
+ long commitTS = tsoClient.commit(START_TS, Collections.<CellId>emptySet()).get();
+ assertEquals(commitTS, COMMIT_TS);
+ }
+
+ @Test
+ public void testCommitRequestReceivingAHeuristicResponse() throws Exception {
+ // test commit request which needs heuristic actions from the client
+ // throws an execution exception with a NewTSOException as a cause
+
+ // Program the TSO to return an Commit response requiring heuristic actions
+ tsoServer.queueResponse(new CommitResponse(true, START_TS, COMMIT_TS));
+ try {
+ tsoClient.commit(START_TS, Collections.<CellId>emptySet()).get();
+ } catch (ExecutionException ee) {
+ assertEquals(ee.getCause().getClass(), NewTSOException.class);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/tso-server/src/test/java/com/yahoo/omid/tso/client/TestUnconnectedTSOClient.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/com/yahoo/omid/tso/client/TestUnconnectedTSOClient.java b/tso-server/src/test/java/com/yahoo/omid/tso/client/TestUnconnectedTSOClient.java
new file mode 100644
index 0000000..70dec4e
--- /dev/null
+++ b/tso-server/src/test/java/com/yahoo/omid/tso/client/TestUnconnectedTSOClient.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.omid.tso.client;
+
+import com.yahoo.omid.tso.util.DummyCellIdImpl;
+import com.yahoo.statemachine.StateMachine.FsmImpl;
+import org.slf4j.Logger;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.collect.Sets.newHashSet;
+import static org.slf4j.LoggerFactory.getLogger;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+/**
+ * Test the behavior of requests on a TSOClient component that is not connected to a TSO server.
+ */
+public class TestUnconnectedTSOClient {
+
+ private static final Logger LOG = getLogger(TestUnconnectedTSOClient.class);
+
+ private static final int TSO_RECONNECTION_DELAY_IN_SECS_FOR_TEST = 2;
+
+ @Test(timeOut = 30_000) // 30 secs
+ public void testRequestsDoneOnAnUnconnectedTSOClientAlwaysReturn() throws Exception {
+
+ OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+ tsoClientConf.setConnectionString("localhost:12345");
+ tsoClientConf.setReconnectionDelayInSecs(TSO_RECONNECTION_DELAY_IN_SECS_FOR_TEST);
+
+ // Component under test
+ TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
+
+ // Internal accessor to fsm
+ FsmImpl fsm = (FsmImpl) tsoClient.fsm;
+
+ assertEquals(fsm.getState().getClass(), TSOClient.DisconnectedState.class);
+
+ // Test requests to the 3 relevant methods in TSO client
+
+ try {
+ tsoClient.getNewStartTimestamp().get();
+ fail();
+ } catch (ExecutionException e) {
+ LOG.info("Exception expected");
+ assertEquals(e.getCause().getClass(), ConnectionException.class);
+ TimeUnit.SECONDS.sleep(TSO_RECONNECTION_DELAY_IN_SECS_FOR_TEST * 2);
+ assertEquals(fsm.getState().getClass(), TSOClient.DisconnectedState.class);
+ }
+
+ try {
+ tsoClient.commit(1, newHashSet(new DummyCellIdImpl(0xdeadbeefL))).get();
+ fail();
+ } catch (ExecutionException e) {
+ LOG.info("Exception expected");
+ assertEquals(e.getCause().getClass(), ConnectionException.class);
+ TimeUnit.SECONDS.sleep(TSO_RECONNECTION_DELAY_IN_SECS_FOR_TEST * 2);
+ assertEquals(fsm.getState().getClass(), TSOClient.DisconnectedState.class);
+ }
+
+ tsoClient.close().get();
+ LOG.info("No exception expected");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/tso-server/src/test/java/com/yahoo/omid/tsoclient/TSOClientAccessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/com/yahoo/omid/tsoclient/TSOClientAccessor.java b/tso-server/src/test/java/com/yahoo/omid/tsoclient/TSOClientAccessor.java
deleted file mode 100644
index d98b353..0000000
--- a/tso-server/src/test/java/com/yahoo/omid/tsoclient/TSOClientAccessor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tsoclient;
-
-import com.yahoo.omid.tsoclient.TSOClient.ConnectedState;
-import com.yahoo.statemachine.StateMachine.FsmImpl;
-
-public class TSOClientAccessor {
-
- public static void closeChannel(TSOClient tsoClient) throws InterruptedException {
- FsmImpl fsm = (FsmImpl) tsoClient.fsm;
- ConnectedState connectedState = (ConnectedState) fsm.getState();
- connectedState.channel.close().await();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/tso-server/src/test/java/com/yahoo/omid/tsoclient/TSOClientOneShot.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/com/yahoo/omid/tsoclient/TSOClientOneShot.java b/tso-server/src/test/java/com/yahoo/omid/tsoclient/TSOClientOneShot.java
deleted file mode 100644
index eb0a49c..0000000
--- a/tso-server/src/test/java/com/yahoo/omid/tsoclient/TSOClientOneShot.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tsoclient;
-
-import com.yahoo.omid.proto.TSOProto;
-import com.yahoo.omid.proto.TSOProto.Response;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ExecutionException;
-
-/**
- * Communication endpoint for TSO clients.
- */
-public class TSOClientOneShot {
-
- private static final Logger LOG = LoggerFactory.getLogger(TSOClientOneShot.class);
-
- private final String host;
- private final int port;
-
- public TSOClientOneShot(String host, int port) {
-
- this.host = host;
- this.port = port;
-
- }
-
- public TSOProto.Response makeRequest(TSOProto.Request request)
- throws InterruptedException, ExecutionException {
- TSOClientRaw raw = new TSOClientRaw(host, port);
-
- // do handshake
- TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
- handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
- raw.write(TSOProto.Request.newBuilder()
- .setHandshakeRequest(handshake.build()).build());
- Response response = raw.getResponse().get();
- assert (response.getHandshakeResponse().getClientCompatible());
-
- raw.write(request);
- response = raw.getResponse().get();
-
- raw.close();
- return response;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/tso-server/src/test/java/com/yahoo/omid/tsoclient/TSOClientRaw.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/com/yahoo/omid/tsoclient/TSOClientRaw.java b/tso-server/src/test/java/com/yahoo/omid/tsoclient/TSOClientRaw.java
deleted file mode 100644
index e6f7ebe..0000000
--- a/tso-server/src/test/java/com/yahoo/omid/tsoclient/TSOClientRaw.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tsoclient;
-
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.yahoo.omid.proto.TSOProto;
-import com.yahoo.omid.proto.TSOProto.Response;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
-import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-/**
- * Raw client for communicating with tso server directly with protobuf messages
- */
-public class TSOClientRaw {
-
- private static final Logger LOG = LoggerFactory.getLogger(TSOClientRaw.class);
-
- private final BlockingQueue<SettableFuture<Response>> responseQueue
- = new ArrayBlockingQueue<SettableFuture<Response>>(5);
- private final Channel channel;
-
- public TSOClientRaw(String host, int port) throws InterruptedException, ExecutionException {
- // Start client with Nb of active threads = 3 as maximum.
- ChannelFactory factory = new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setNameFormat("tsoclient-boss-%d").build()),
- Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build()), 3);
- // Create the bootstrap
- ClientBootstrap bootstrap = new ClientBootstrap(factory);
-
- InetSocketAddress addr = new InetSocketAddress(host, port);
-
- ChannelPipeline pipeline = bootstrap.getPipeline();
- pipeline.addLast("lengthbaseddecoder",
- new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
- pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
- pipeline.addLast("protobufdecoder",
- new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
- pipeline.addLast("protobufencoder", new ProtobufEncoder());
-
- Handler handler = new Handler();
- pipeline.addLast("handler", handler);
-
- bootstrap.setOption("tcpNoDelay", true);
- bootstrap.setOption("keepAlive", true);
- bootstrap.setOption("reuseAddress", true);
- bootstrap.setOption("connectTimeoutMillis", 100);
-
- ChannelFuture channelFuture = bootstrap.connect(addr).await();
- channel = channelFuture.getChannel();
- }
-
- public void write(TSOProto.Request request) {
- channel.write(request);
- }
-
- public Future<Response> getResponse() throws InterruptedException {
- SettableFuture<Response> future = SettableFuture.<Response>create();
- responseQueue.put(future);
- return future;
- }
-
- public void close() throws InterruptedException {
- responseQueue.put(SettableFuture.<Response>create());
- channel.close();
- }
-
- private class Handler extends SimpleChannelHandler {
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
- LOG.info("Message received", e);
- if (e.getMessage() instanceof Response) {
- Response resp = (Response) e.getMessage();
- try {
- SettableFuture<Response> future = responseQueue.take();
- future.set(resp);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- LOG.warn("Interrupted in handler", ie);
- }
- } else {
- LOG.warn("Received unknown message", e.getMessage());
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
- LOG.info("Exception received", e.getCause());
- try {
- SettableFuture<Response> future = responseQueue.take();
- future.setException(e.getCause());
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- LOG.warn("Interrupted handling exception", ie);
- }
- }
-
- @Override
- public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
- throws Exception {
- LOG.info("Disconnected");
- try {
- SettableFuture<Response> future = responseQueue.take();
- future.setException(new ConnectionException());
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- LOG.warn("Interrupted handling exception", ie);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/tso-server/src/test/java/com/yahoo/omid/tsoclient/TestIntegrationOfTSOClientServerBasicFunctionality.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/com/yahoo/omid/tsoclient/TestIntegrationOfTSOClientServerBasicFunctionality.java b/tso-server/src/test/java/com/yahoo/omid/tsoclient/TestIntegrationOfTSOClientServerBasicFunctionality.java
deleted file mode 100644
index 5185d83..0000000
--- a/tso-server/src/test/java/com/yahoo/omid/tsoclient/TestIntegrationOfTSOClientServerBasicFunctionality.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tsoclient;
-
-import com.google.common.collect.Sets;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.Module;
-import com.yahoo.omid.TestUtils;
-import com.yahoo.omid.committable.CommitTable;
-import com.yahoo.omid.tso.TSOMockModule;
-import com.yahoo.omid.tso.TSOServer;
-import com.yahoo.omid.tso.TSOServerConfig;
-import com.yahoo.omid.tso.util.DummyCellIdImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-import static org.testng.AssertJUnit.assertNotNull;
-
-public class TestIntegrationOfTSOClientServerBasicFunctionality {
-
- private static final Logger LOG = LoggerFactory.getLogger(TestIntegrationOfTSOClientServerBasicFunctionality.class);
-
- private static final String TSO_SERVER_HOST = "localhost";
- private int tsoServerPortForTest;
-
- // Cells for tests
- private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL);
- private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL);
-
- // Required infrastructure for TSO tsoClient-server integration testing
- private TSOServer tsoServer;
- private TSOClient tsoClient;
- private TSOClient justAnotherTSOClient;
- private CommitTable.Client commitTableClient;
-
- @BeforeClass
- public void setup() throws Exception {
-
- tsoServerPortForTest = TestUtils.getFreeLocalPort();
-
- TSOServerConfig tsoConfig = new TSOServerConfig();
- tsoConfig.setMaxItems(1000);
- tsoConfig.setPort(tsoServerPortForTest);
- Module tsoServerMockModule = new TSOMockModule(tsoConfig);
- Injector injector = Guice.createInjector(tsoServerMockModule);
-
- CommitTable commitTable = injector.getInstance(CommitTable.class);
- commitTableClient = commitTable.getClient();
-
- LOG.info("==================================================================================================");
- LOG.info("======================================= Init TSO Server ==========================================");
- LOG.info("==================================================================================================");
-
- tsoServer = injector.getInstance(TSOServer.class);
- tsoServer.startAndWait();
- TestUtils.waitForSocketListening(TSO_SERVER_HOST, tsoServerPortForTest, 100);
-
- LOG.info("==================================================================================================");
- LOG.info("===================================== TSO Server Initialized =====================================");
- LOG.info("==================================================================================================");
-
- LOG.info("==================================================================================================");
- LOG.info("======================================= Setup TSO Clients ========================================");
- LOG.info("==================================================================================================");
-
- // Configure direct connection to the server
- OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
- tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + tsoServerPortForTest);
-
- tsoClient = TSOClient.newInstance(tsoClientConf);
- justAnotherTSOClient = TSOClient.newInstance(tsoClientConf);
-
- LOG.info("==================================================================================================");
- LOG.info("===================================== TSO Clients Initialized ====================================");
- LOG.info("==================================================================================================");
-
- Thread.currentThread().setName("Test Thread");
-
- }
-
- @AfterClass
- public void tearDown() throws Exception {
-
- tsoClient.close().get();
-
- tsoServer.stopAndWait();
- tsoServer = null;
- TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, tsoServerPortForTest, 1000);
-
- }
-
- @Test(timeOut = 30_000)
- public void testTimestampsOrderingGrowMonotonically() throws Exception {
- long referenceTimestamp;
- long startTsTx1 = tsoClient.getNewStartTimestamp().get();
- referenceTimestamp = startTsTx1;
-
- long startTsTx2 = tsoClient.getNewStartTimestamp().get();
- assertEquals(startTsTx2, ++referenceTimestamp, "Should grow monotonically");
- assertTrue(startTsTx2 > startTsTx1, "Two timestamps obtained consecutively should grow");
-
- long commitTsTx2 = tsoClient.commit(startTsTx2, Sets.newHashSet(c1)).get();
- assertEquals(commitTsTx2, ++referenceTimestamp, "Should grow monotonically");
-
- long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c2)).get();
- assertEquals(commitTsTx1, ++referenceTimestamp, "Should grow monotonically");
-
- long startTsTx3 = tsoClient.getNewStartTimestamp().get();
- assertEquals(startTsTx3, ++referenceTimestamp, "Should grow monotonically");
- }
-
- @Test(timeOut = 30_000)
- public void testSimpleTransactionWithNoWriteSetCanCommit() throws Exception {
- long startTsTx1 = tsoClient.getNewStartTimestamp().get();
- long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.<CellId>newHashSet()).get();
- assertTrue(commitTsTx1 > startTsTx1);
- }
-
- @Test(timeOut = 30_000)
- public void testTransactionWithMassiveWriteSetCanCommit() throws Exception {
- long startTs = tsoClient.getNewStartTimestamp().get();
-
- Set<CellId> cells = new HashSet<>();
- for (int i = 0; i < 1_000_000; i++) {
- cells.add(new DummyCellIdImpl(i));
- }
-
- long commitTs = tsoClient.commit(startTs, cells).get();
- assertTrue(commitTs > startTs, "Commit TS should be higher than Start TS");
- }
-
- @Test(timeOut = 30_000)
- public void testMultipleSerialCommitsDoNotConflict() throws Exception {
- long startTsTx1 = tsoClient.getNewStartTimestamp().get();
- long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c1)).get();
- assertTrue(commitTsTx1 > startTsTx1, "Commit TS must be greater than Start TS");
-
- long startTsTx2 = tsoClient.getNewStartTimestamp().get();
- assertTrue(startTsTx2 > commitTsTx1, "TS should grow monotonically");
-
- long commitTsTx2 = tsoClient.commit(startTsTx2, Sets.newHashSet(c1, c2)).get();
- assertTrue(commitTsTx2 > startTsTx2, "Commit TS must be greater than Start TS");
-
- long startTsTx3 = tsoClient.getNewStartTimestamp().get();
- long commitTsTx3 = tsoClient.commit(startTsTx3, Sets.newHashSet(c2)).get();
- assertTrue(commitTsTx3 > startTsTx3, "Commit TS must be greater than Start TS");
- }
-
- @Test(timeOut = 30_000)
- public void testCommitWritesToCommitTable() throws Exception {
- long startTsForTx1 = tsoClient.getNewStartTimestamp().get();
- long startTsForTx2 = tsoClient.getNewStartTimestamp().get();
- assertTrue(startTsForTx2 > startTsForTx1, "Start TS should grow");
-
- assertFalse(commitTableClient.getCommitTimestamp(startTsForTx1).get().isPresent(),
- "Commit TS for Tx1 shouldn't appear in Commit Table");
-
- long commitTsForTx1 = tsoClient.commit(startTsForTx1, Sets.newHashSet(c1)).get();
- assertTrue(commitTsForTx1 > startTsForTx1, "Commit TS should be higher than Start TS for the same tx");
-
- Long commitTs1InCommitTable = commitTableClient.getCommitTimestamp(startTsForTx1).get().get().getValue();
- assertNotNull("Tx is committed, should return as such from Commit Table", commitTs1InCommitTable);
- assertEquals(commitTsForTx1, (long) commitTs1InCommitTable,
- "getCommitTimestamp() & commit() should report same Commit TS value for same tx");
- assertTrue(commitTs1InCommitTable > startTsForTx2, "Commit TS should be higher than tx's Start TS");
- }
-
- @Test(timeOut = 30_000)
- public void testTwoConcurrentTxWithOverlappingWritesetsHaveConflicts() throws Exception {
- long startTsTx1 = tsoClient.getNewStartTimestamp().get();
- long startTsTx2 = tsoClient.getNewStartTimestamp().get();
- assertTrue(startTsTx2 > startTsTx1, "Second TX should have higher TS");
-
- long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c1)).get();
- assertTrue(commitTsTx1 > startTsTx1, "Commit TS must be higher than Start TS for the same tx");
-
- try {
- tsoClient.commit(startTsTx2, Sets.newHashSet(c1, c2)).get();
- Assert.fail("Second TX should fail on commit");
- } catch (ExecutionException ee) {
- assertEquals(AbortException.class, ee.getCause().getClass(), "Should have aborted");
- }
- }
-
- @Test(timeOut = 30_000)
- public void testConflictsAndMonotonicallyTimestampGrowthWithTwoDifferentTSOClients() throws Exception {
- long startTsTx1Client1 = tsoClient.getNewStartTimestamp().get();
- long startTsTx2Client1 = tsoClient.getNewStartTimestamp().get();
- long startTsTx3Client1 = tsoClient.getNewStartTimestamp().get();
-
- tsoClient.commit(startTsTx1Client1, Sets.newHashSet(c1)).get();
- try {
- tsoClient.commit(startTsTx3Client1, Sets.newHashSet(c1, c2)).get();
- Assert.fail("Second commit should fail as conflicts with the previous concurrent one");
- } catch (ExecutionException ee) {
- assertEquals(AbortException.class, ee.getCause().getClass(), "Should have aborted");
- }
- long startTsTx4Client2 = justAnotherTSOClient.getNewStartTimestamp().get();
-
- assertFalse(commitTableClient.getCommitTimestamp(startTsTx3Client1).get().isPresent(), "Tx3 didn't commit");
- long commitTSTx1 = commitTableClient.getCommitTimestamp(startTsTx1Client1).get().get().getValue();
- assertTrue(commitTSTx1 > startTsTx2Client1, "Tx1 committed after Tx2 started");
- assertTrue(commitTSTx1 < startTsTx4Client2, "Tx1 committed before Tx4 started on the other TSO Client");
- }
-
-}