You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by ik...@apache.org on 2016/04/21 02:02:32 UTC
[15/52] [abbrv] incubator-omid git commit: Move com.yahoo ->
org.apache
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java b/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java
new file mode 100644
index 0000000..66a70fb
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import org.apache.omid.metrics.MetricsRegistry;
+import org.apache.omid.timestamp.storage.TimestampStorage;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+public class TestTimestampOracle {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestTimestampOracle.class);
+
+ @Mock
+ private MetricsRegistry metrics;
+ @Mock
+ private Panicker panicker;
+ @Mock
+ private TimestampStorage timestampStorage;
+
+ // Component under test
+ @InjectMocks
+ private TimestampOracleImpl timestampOracle;
+
+ @BeforeMethod(alwaysRun = true, timeOut = 30_000)
+ public void initMocksAndComponents() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test(timeOut = 10_000)
+ public void testMonotonicTimestampGrowth() throws Exception {
+
+ // Intialize component under test
+ timestampOracle.initialize();
+
+ long last = timestampOracle.next();
+ for (int i = 0; i < (3 * TimestampOracleImpl.TIMESTAMP_BATCH); i++) {
+ long current = timestampOracle.next();
+ assertEquals(current, last + 1, "Not monotonic growth");
+ last = current;
+ }
+ assertTrue(timestampOracle.getLast() == last);
+ LOG.info("Last timestamp: {}", last);
+ }
+
+ @Test(timeOut = 10_000)
+ public void testTimestampOraclePanicsWhenTheStorageHasProblems() throws Exception {
+
+ // Intialize component under test
+ timestampOracle.initialize();
+
+ // Cause an exception when updating the max timestamp
+ final CountDownLatch updateMaxTimestampMethodCalled = new CountDownLatch(1);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ updateMaxTimestampMethodCalled.countDown();
+ throw new RuntimeException("Out of memory or something");
+ }
+ }).when(timestampStorage).updateMaxTimestamp(anyLong(), anyLong());
+
+ // Make the previous exception to be thrown
+ Thread allocThread = new Thread("AllocThread") {
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ timestampOracle.next();
+ }
+ } catch (IOException ioe) {
+ LOG.error("Shouldn't occur");
+ }
+ }
+ };
+ allocThread.start();
+
+ updateMaxTimestampMethodCalled.await();
+
+ // Verify that it has blown up
+ verify(panicker, atLeastOnce()).panic(anyString(), any(Throwable.class));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientAccessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientAccessor.java b/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientAccessor.java
new file mode 100644
index 0000000..74ed196
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientAccessor.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso.client;
+
+import org.apache.statemachine.StateMachine.FsmImpl;
+
+public class TSOClientAccessor {
+
+ public static void closeChannel(TSOClient tsoClient) throws InterruptedException {
+ FsmImpl fsm = (FsmImpl) tsoClient.fsm;
+ TSOClient.ConnectedState connectedState = (TSOClient.ConnectedState) fsm.getState();
+ connectedState.channel.close().await();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientOneShot.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientOneShot.java b/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientOneShot.java
new file mode 100644
index 0000000..ff60753
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientOneShot.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso.client;
+
+import org.apache.omid.proto.TSOProto;
+import org.apache.omid.proto.TSOProto.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Communication endpoint for TSO clients.
+ */
+public class TSOClientOneShot {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TSOClientOneShot.class);
+
+ private final String host;
+ private final int port;
+
+ public TSOClientOneShot(String host, int port) {
+
+ this.host = host;
+ this.port = port;
+
+ }
+
+ public TSOProto.Response makeRequest(TSOProto.Request request)
+ throws InterruptedException, ExecutionException {
+ TSOClientRaw raw = new TSOClientRaw(host, port);
+
+ // do handshake
+ TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
+ handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
+ raw.write(TSOProto.Request.newBuilder()
+ .setHandshakeRequest(handshake.build()).build());
+ Response response = raw.getResponse().get();
+ assert (response.getHandshakeResponse().getClientCompatible());
+
+ raw.write(request);
+ response = raw.getResponse().get();
+
+ raw.close();
+ return response;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientRaw.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientRaw.java b/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientRaw.java
new file mode 100644
index 0000000..beb6c47
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientRaw.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso.client;
+
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.omid.proto.TSOProto;
+import org.apache.omid.proto.TSOProto.Response;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
+import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * Raw client for communicating with tso server directly with protobuf messages
+ */
+public class TSOClientRaw {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TSOClientRaw.class);
+
+ private final BlockingQueue<SettableFuture<Response>> responseQueue
+ = new ArrayBlockingQueue<SettableFuture<Response>>(5);
+ private final Channel channel;
+
+ public TSOClientRaw(String host, int port) throws InterruptedException, ExecutionException {
+ // Start client with Nb of active threads = 3 as maximum.
+ ChannelFactory factory = new NioClientSocketChannelFactory(
+ Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setNameFormat("tsoclient-boss-%d").build()),
+ Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build()), 3);
+ // Create the bootstrap
+ ClientBootstrap bootstrap = new ClientBootstrap(factory);
+
+ InetSocketAddress addr = new InetSocketAddress(host, port);
+
+ ChannelPipeline pipeline = bootstrap.getPipeline();
+ pipeline.addLast("lengthbaseddecoder",
+ new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
+ pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
+ pipeline.addLast("protobufdecoder",
+ new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
+ pipeline.addLast("protobufencoder", new ProtobufEncoder());
+
+ Handler handler = new Handler();
+ pipeline.addLast("handler", handler);
+
+ bootstrap.setOption("tcpNoDelay", true);
+ bootstrap.setOption("keepAlive", true);
+ bootstrap.setOption("reuseAddress", true);
+ bootstrap.setOption("connectTimeoutMillis", 100);
+
+ ChannelFuture channelFuture = bootstrap.connect(addr).await();
+ channel = channelFuture.getChannel();
+ }
+
+ public void write(TSOProto.Request request) {
+ channel.write(request);
+ }
+
+ public Future<Response> getResponse() throws InterruptedException {
+ SettableFuture<Response> future = SettableFuture.<Response>create();
+ responseQueue.put(future);
+ return future;
+ }
+
+ public void close() throws InterruptedException {
+ responseQueue.put(SettableFuture.<Response>create());
+ channel.close();
+ }
+
+ private class Handler extends SimpleChannelHandler {
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ LOG.info("Message received", e);
+ if (e.getMessage() instanceof Response) {
+ Response resp = (Response) e.getMessage();
+ try {
+ SettableFuture<Response> future = responseQueue.take();
+ future.set(resp);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted in handler", ie);
+ }
+ } else {
+ LOG.warn("Received unknown message", e.getMessage());
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ LOG.info("Exception received", e.getCause());
+ try {
+ SettableFuture<Response> future = responseQueue.take();
+ future.setException(e.getCause());
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted handling exception", ie);
+ }
+ }
+
+ @Override
+ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
+ throws Exception {
+ LOG.info("Disconnected");
+ try {
+ SettableFuture<Response> future = responseQueue.take();
+ future.setException(new ConnectionException());
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted handling exception", ie);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
new file mode 100644
index 0000000..9f7263e
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso.client;
+
+import com.google.common.collect.Sets;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import org.apache.omid.TestUtils;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.tso.TSOMockModule;
+import org.apache.omid.tso.TSOServer;
+import org.apache.omid.tso.TSOServerConfig;
+import org.apache.omid.tso.util.DummyCellIdImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.assertNotNull;
+
+public class TestIntegrationOfTSOClientServerBasicFunctionality {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestIntegrationOfTSOClientServerBasicFunctionality.class);
+
+ private static final String TSO_SERVER_HOST = "localhost";
+ private int tsoServerPortForTest;
+
+ // Cells for tests
+ private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL);
+ private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL);
+
+ // Required infrastructure for TSO tsoClient-server integration testing
+ private TSOServer tsoServer;
+ private TSOClient tsoClient;
+ private TSOClient justAnotherTSOClient;
+ private CommitTable.Client commitTableClient;
+
+ @BeforeClass
+ public void setup() throws Exception {
+
+ tsoServerPortForTest = TestUtils.getFreeLocalPort();
+
+ TSOServerConfig tsoConfig = new TSOServerConfig();
+ tsoConfig.setMaxItems(1000);
+ tsoConfig.setPort(tsoServerPortForTest);
+ Module tsoServerMockModule = new TSOMockModule(tsoConfig);
+ Injector injector = Guice.createInjector(tsoServerMockModule);
+
+ CommitTable commitTable = injector.getInstance(CommitTable.class);
+ commitTableClient = commitTable.getClient();
+
+ LOG.info("==================================================================================================");
+ LOG.info("======================================= Init TSO Server ==========================================");
+ LOG.info("==================================================================================================");
+
+ tsoServer = injector.getInstance(TSOServer.class);
+ tsoServer.startAndWait();
+ TestUtils.waitForSocketListening(TSO_SERVER_HOST, tsoServerPortForTest, 100);
+
+ LOG.info("==================================================================================================");
+ LOG.info("===================================== TSO Server Initialized =====================================");
+ LOG.info("==================================================================================================");
+
+ LOG.info("==================================================================================================");
+ LOG.info("======================================= Setup TSO Clients ========================================");
+ LOG.info("==================================================================================================");
+
+ // Configure direct connection to the server
+ OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+ tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + tsoServerPortForTest);
+
+ tsoClient = TSOClient.newInstance(tsoClientConf);
+ justAnotherTSOClient = TSOClient.newInstance(tsoClientConf);
+
+ LOG.info("==================================================================================================");
+ LOG.info("===================================== TSO Clients Initialized ====================================");
+ LOG.info("==================================================================================================");
+
+ Thread.currentThread().setName("Test Thread");
+
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception {
+
+ tsoClient.close().get();
+
+ tsoServer.stopAndWait();
+ tsoServer = null;
+ TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, tsoServerPortForTest, 1000);
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testTimestampsOrderingGrowMonotonically() throws Exception {
+ long referenceTimestamp;
+ long startTsTx1 = tsoClient.getNewStartTimestamp().get();
+ referenceTimestamp = startTsTx1;
+
+ long startTsTx2 = tsoClient.getNewStartTimestamp().get();
+ assertEquals(startTsTx2, ++referenceTimestamp, "Should grow monotonically");
+ assertTrue(startTsTx2 > startTsTx1, "Two timestamps obtained consecutively should grow");
+
+ long commitTsTx2 = tsoClient.commit(startTsTx2, Sets.newHashSet(c1)).get();
+ assertEquals(commitTsTx2, ++referenceTimestamp, "Should grow monotonically");
+
+ long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c2)).get();
+ assertEquals(commitTsTx1, ++referenceTimestamp, "Should grow monotonically");
+
+ long startTsTx3 = tsoClient.getNewStartTimestamp().get();
+ assertEquals(startTsTx3, ++referenceTimestamp, "Should grow monotonically");
+ }
+
+ @Test(timeOut = 30_000)
+ public void testSimpleTransactionWithNoWriteSetCanCommit() throws Exception {
+ long startTsTx1 = tsoClient.getNewStartTimestamp().get();
+ long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.<CellId>newHashSet()).get();
+ assertTrue(commitTsTx1 > startTsTx1);
+ }
+
+ @Test(timeOut = 30_000)
+ public void testTransactionWithMassiveWriteSetCanCommit() throws Exception {
+ long startTs = tsoClient.getNewStartTimestamp().get();
+
+ Set<CellId> cells = new HashSet<>();
+ for (int i = 0; i < 1_000_000; i++) {
+ cells.add(new DummyCellIdImpl(i));
+ }
+
+ long commitTs = tsoClient.commit(startTs, cells).get();
+ assertTrue(commitTs > startTs, "Commit TS should be higher than Start TS");
+ }
+
+ @Test(timeOut = 30_000)
+ public void testMultipleSerialCommitsDoNotConflict() throws Exception {
+ long startTsTx1 = tsoClient.getNewStartTimestamp().get();
+ long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c1)).get();
+ assertTrue(commitTsTx1 > startTsTx1, "Commit TS must be greater than Start TS");
+
+ long startTsTx2 = tsoClient.getNewStartTimestamp().get();
+ assertTrue(startTsTx2 > commitTsTx1, "TS should grow monotonically");
+
+ long commitTsTx2 = tsoClient.commit(startTsTx2, Sets.newHashSet(c1, c2)).get();
+ assertTrue(commitTsTx2 > startTsTx2, "Commit TS must be greater than Start TS");
+
+ long startTsTx3 = tsoClient.getNewStartTimestamp().get();
+ long commitTsTx3 = tsoClient.commit(startTsTx3, Sets.newHashSet(c2)).get();
+ assertTrue(commitTsTx3 > startTsTx3, "Commit TS must be greater than Start TS");
+ }
+
+ @Test(timeOut = 30_000)
+ public void testCommitWritesToCommitTable() throws Exception {
+ long startTsForTx1 = tsoClient.getNewStartTimestamp().get();
+ long startTsForTx2 = tsoClient.getNewStartTimestamp().get();
+ assertTrue(startTsForTx2 > startTsForTx1, "Start TS should grow");
+
+ assertFalse(commitTableClient.getCommitTimestamp(startTsForTx1).get().isPresent(),
+ "Commit TS for Tx1 shouldn't appear in Commit Table");
+
+ long commitTsForTx1 = tsoClient.commit(startTsForTx1, Sets.newHashSet(c1)).get();
+ assertTrue(commitTsForTx1 > startTsForTx1, "Commit TS should be higher than Start TS for the same tx");
+
+ Long commitTs1InCommitTable = commitTableClient.getCommitTimestamp(startTsForTx1).get().get().getValue();
+ assertNotNull("Tx is committed, should return as such from Commit Table", commitTs1InCommitTable);
+ assertEquals(commitTsForTx1, (long) commitTs1InCommitTable,
+ "getCommitTimestamp() & commit() should report same Commit TS value for same tx");
+ assertTrue(commitTs1InCommitTable > startTsForTx2, "Commit TS should be higher than tx's Start TS");
+ }
+
+ @Test(timeOut = 30_000)
+ public void testTwoConcurrentTxWithOverlappingWritesetsHaveConflicts() throws Exception {
+ long startTsTx1 = tsoClient.getNewStartTimestamp().get();
+ long startTsTx2 = tsoClient.getNewStartTimestamp().get();
+ assertTrue(startTsTx2 > startTsTx1, "Second TX should have higher TS");
+
+ long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c1)).get();
+ assertTrue(commitTsTx1 > startTsTx1, "Commit TS must be higher than Start TS for the same tx");
+
+ try {
+ tsoClient.commit(startTsTx2, Sets.newHashSet(c1, c2)).get();
+ Assert.fail("Second TX should fail on commit");
+ } catch (ExecutionException ee) {
+ assertEquals(AbortException.class, ee.getCause().getClass(), "Should have aborted");
+ }
+ }
+
+ @Test(timeOut = 30_000)
+ public void testConflictsAndMonotonicallyTimestampGrowthWithTwoDifferentTSOClients() throws Exception {
+ long startTsTx1Client1 = tsoClient.getNewStartTimestamp().get();
+ long startTsTx2Client1 = tsoClient.getNewStartTimestamp().get();
+ long startTsTx3Client1 = tsoClient.getNewStartTimestamp().get();
+
+ tsoClient.commit(startTsTx1Client1, Sets.newHashSet(c1)).get();
+ try {
+ tsoClient.commit(startTsTx3Client1, Sets.newHashSet(c1, c2)).get();
+ Assert.fail("Second commit should fail as conflicts with the previous concurrent one");
+ } catch (ExecutionException ee) {
+ assertEquals(AbortException.class, ee.getCause().getClass(), "Should have aborted");
+ }
+ long startTsTx4Client2 = justAnotherTSOClient.getNewStartTimestamp().get();
+
+ assertFalse(commitTableClient.getCommitTimestamp(startTsTx3Client1).get().isPresent(), "Tx3 didn't commit");
+ long commitTSTx1 = commitTableClient.getCommitTimestamp(startTsTx1Client1).get().get().getValue();
+ assertTrue(commitTSTx1 > startTsTx2Client1, "Tx1 committed after Tx2 started");
+ assertTrue(commitTSTx1 < startTsTx4Client2, "Tx1 committed before Tx4 started on the other TSO Client");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientConnectionToTSO.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientConnectionToTSO.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientConnectionToTSO.java
new file mode 100644
index 0000000..2b4c312
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientConnectionToTSO.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso.client;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.omid.TestUtils;
+import org.apache.omid.tso.HALeaseManagementModule;
+import org.apache.omid.tso.TSOMockModule;
+import org.apache.omid.tso.TSOServer;
+import org.apache.omid.tso.TSOServerConfig;
+import org.apache.omid.tso.VoidLeaseManagementModule;
+import org.apache.statemachine.StateMachine.FsmImpl;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.ExecutionException;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+public class TestTSOClientConnectionToTSO {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientConnectionToTSO.class);
+
+ // Constants and variables for component connectivity
+ private static final String TSO_HOST = "localhost";
+ private static final String CURRENT_TSO_PATH = "/current_tso_path";
+ private static final String TSO_LEASE_PATH = "/tso_lease_path";
+
+ private int tsoPortForTest;
+ private String zkClusterForTest;
+
+ private Injector injector = null;
+
+ private TestingServer zkServer;
+
+ private CuratorFramework zkClient;
+ private TSOServer tsoServer;
+
+ @BeforeMethod
+ public void beforeMethod() throws Exception {
+
+ tsoPortForTest = TestUtils.getFreeLocalPort();
+
+ int zkPortForTest = TestUtils.getFreeLocalPort();
+ zkClusterForTest = TSO_HOST + ":" + zkPortForTest;
+ LOG.info("Starting ZK Server in port {}", zkPortForTest);
+ zkServer = TestUtils.provideTestingZKServer(zkPortForTest);
+ LOG.info("ZK Server Started @ {}", zkServer.getConnectString());
+
+ zkClient = TestUtils.provideConnectedZKClient(zkClusterForTest);
+
+ Stat stat;
+ try {
+ zkClient.delete().forPath(CURRENT_TSO_PATH);
+ stat = zkClient.checkExists().forPath(CURRENT_TSO_PATH);
+ assertNull(stat, CURRENT_TSO_PATH + " should not exist");
+ } catch (NoNodeException e) {
+ LOG.info("{} ZNode did not exist", CURRENT_TSO_PATH);
+ }
+
+ }
+
+ @AfterMethod
+ public void afterMethod() {
+
+ zkClient.close();
+
+ CloseableUtils.closeQuietly(zkServer);
+ zkServer = null;
+ LOG.info("ZK Server Stopped");
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testUnsuccessfulConnectionToTSO() throws Exception {
+
+ // When no HA node for TSOServer is found & no host:port config exists
+ // we should get an exception when getting the client
+ try {
+ TSOClient.newInstance(new OmidClientConfiguration());
+ } catch (IllegalArgumentException e) {
+ // Expected
+ }
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testSuccessfulConnectionToTSOWithHostAndPort() throws Exception {
+
+ // Launch a TSO WITHOUT publishing the address in HA...
+ TSOServerConfig tsoConfig = new TSOServerConfig();
+ tsoConfig.setMaxItems(1000);
+ tsoConfig.setPort(tsoPortForTest);
+ tsoConfig.setLeaseModule(new VoidLeaseManagementModule());
+ injector = Guice.createInjector(new TSOMockModule(tsoConfig));
+ LOG.info("Starting TSO");
+ tsoServer = injector.getInstance(TSOServer.class);
+ tsoServer.startAndWait();
+ TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
+ LOG.info("Finished loading TSO");
+
+ // When no HA node for TSOServer is found we should get a connection
+ // to the TSO through the host:port configured...
+ OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+ tsoClientConf.setConnectionString("localhost:" + tsoPortForTest);
+ tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
+ TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
+
+ // ... so we should get responses from the methods
+ Long startTS = tsoClient.getNewStartTimestamp().get();
+ LOG.info("Start TS {} ", startTS);
+ assertEquals(startTS.longValue(), 1);
+
+ // Close the tsoClient connection and stop the TSO Server
+ tsoClient.close().get();
+ tsoServer.stopAndWait();
+ tsoServer = null;
+ TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
+ LOG.info("TSO Server Stopped");
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testSuccessfulConnectionToTSOThroughZK() throws Exception {
+
+ // Launch a TSO publishing the address in HA...
+ TSOServerConfig config = new TSOServerConfig();
+ config.setMaxItems(1000);
+ config.setPort(tsoPortForTest);
+ config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid"));
+ injector = Guice.createInjector(new TSOMockModule(config));
+ LOG.info("Starting TSO");
+ tsoServer = injector.getInstance(TSOServer.class);
+ tsoServer.startAndWait();
+ TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
+ LOG.info("Finished loading TSO");
+
+ waitTillTsoRegisters(injector.getInstance(CuratorFramework.class));
+
+ // When a HA node for TSOServer is found we should get a connection
+ OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+ tsoClientConf.setConnectionType(OmidClientConfiguration.ConnType.HA);
+ tsoClientConf.setConnectionString(zkClusterForTest);
+ tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
+ TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
+
+ // ... so we should get responses from the methods
+ Long startTS = tsoClient.getNewStartTimestamp().get();
+ LOG.info("Start TS {} ", startTS);
+ assertEquals(startTS.longValue(), 1);
+
+ // Close the tsoClient connection and stop the TSO Server
+ tsoClient.close().get();
+ tsoServer.stopAndWait();
+ tsoServer = null;
+ TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
+ LOG.info("TSO Server Stopped");
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testSuccessOfTSOClientReconnectionsToARestartedTSOWithZKPublishing() throws Exception {
+
+ // Start a TSO with HA...
+ TSOServerConfig config = new TSOServerConfig();
+ config.setMaxItems(1000);
+ config.setPort(tsoPortForTest);
+ config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid"));
+ injector = Guice.createInjector(new TSOMockModule(config));
+ LOG.info("Starting Initial TSO");
+ tsoServer = injector.getInstance(TSOServer.class);
+ tsoServer.startAndWait();
+ TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
+ LOG.info("Finished loading TSO");
+
+ waitTillTsoRegisters(injector.getInstance(CuratorFramework.class));
+
+ // Then create the TSO Client under test...
+ OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+ tsoClientConf.setConnectionType(OmidClientConfiguration.ConnType.HA);
+ tsoClientConf.setConnectionString(zkClusterForTest);
+ tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
+ TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
+
+ // ... and check that initially we get responses from the methods
+ Long startTS = tsoClient.getNewStartTimestamp().get();
+ LOG.info("Start TS {} ", startTS);
+ assertEquals(startTS.longValue(), 1);
+
+ // Then stop the server...
+ tsoServer.stopAndWait();
+ tsoServer = null;
+ TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
+ LOG.info("Initial TSO Server Stopped");
+
+ Thread.sleep(1500); // ...allow the client to receive disconnection event...
+ // ... and check that we get a conn exception when trying to access the client
+ try {
+ startTS = tsoClient.getNewStartTimestamp().get();
+ fail();
+ } catch (ExecutionException e) {
+ LOG.info("Exception expected");
+ // Internal accessor to fsm to do the required checkings
+ FsmImpl fsm = (FsmImpl) tsoClient.fsm;
+ assertEquals(e.getCause().getClass(), ConnectionException.class);
+ assertTrue(fsm.getState().getClass().equals(TSOClient.ConnectionFailedState.class)
+ ||
+ fsm.getState().getClass().equals(TSOClient.DisconnectedState.class));
+ }
+
+ // After that, simulate that a new TSO has been launched...
+ Injector newInjector = Guice.createInjector(new TSOMockModule(config));
+ LOG.info("Re-Starting again the TSO");
+ tsoServer = newInjector.getInstance(TSOServer.class);
+ tsoServer.startAndWait();
+ TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
+ LOG.info("Finished loading restarted TSO");
+
+ // Finally re-check that, eventually, we can get a new value from the new TSO...
+ boolean reconnectionActive = false;
+ while (!reconnectionActive) {
+ try {
+ startTS = tsoClient.getNewStartTimestamp().get();
+ reconnectionActive = true;
+ } catch (ExecutionException e) {
+ // Expected
+ }
+ }
+ assertNotNull(startTS);
+
+ // ...and stop the server
+ tsoServer.stopAndWait();
+ TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
+ LOG.info("Restarted TSO Server Stopped");
+ }
+
+ private void waitTillTsoRegisters(CuratorFramework zkClient) throws Exception {
+ while (true) {
+ try {
+ Stat stat = zkClient.checkExists().forPath(CURRENT_TSO_PATH);
+ if (stat == null) {
+ continue;
+ }
+ LOG.info("TSO registered in HA with path {}={}", CURRENT_TSO_PATH, stat.toString());
+ if (stat.toString().length() == 0) {
+ continue;
+ }
+ return;
+ } catch (Exception e) {
+ LOG.debug("TSO still has not registered yet, sleeping...", e);
+ Thread.sleep(500);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
new file mode 100644
index 0000000..44c4858
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso.client;
+
+import com.google.common.collect.Sets;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import org.apache.omid.TestUtils;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.proto.TSOProto;
+import org.apache.omid.tso.PausableTimestampOracle;
+import org.apache.omid.tso.TSOMockModule;
+import org.apache.omid.tso.TSOServer;
+import org.apache.omid.tso.TSOServerConfig;
+import org.apache.omid.tso.TimestampOracle;
+import org.apache.omid.tso.util.DummyCellIdImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+public class TestTSOClientRequestAndResponseBehaviours {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientRequestAndResponseBehaviours.class);
+
+ private static final String TSO_SERVER_HOST = "localhost";
+ private static final int TSO_SERVER_PORT = 1234;
+
+ private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL);
+ private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL);
+
+ private final static Set<CellId> testWriteSet = Sets.newHashSet(c1, c2);
+
+ private OmidClientConfiguration tsoClientConf;
+
+ // Required infrastructure for TSOClient test
+ private TSOServer tsoServer;
+ private PausableTimestampOracle pausableTSOracle;
+ private CommitTable commitTable;
+
+ @BeforeClass
+ public void setup() throws Exception {
+
+ TSOServerConfig tsoConfig = new TSOServerConfig();
+ tsoConfig.setMaxItems(1000);
+ tsoConfig.setPort(TSO_SERVER_PORT);
+ Module tsoServerMockModule = new TSOMockModule(tsoConfig);
+ Injector injector = Guice.createInjector(tsoServerMockModule);
+
+ LOG.info("==================================================================================================");
+ LOG.info("======================================= Init TSO Server ==========================================");
+ LOG.info("==================================================================================================");
+
+ tsoServer = injector.getInstance(TSOServer.class);
+ tsoServer.startAndWait();
+ TestUtils.waitForSocketListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 100);
+
+ LOG.info("==================================================================================================");
+ LOG.info("===================================== TSO Server Initialized =====================================");
+ LOG.info("==================================================================================================");
+
+ pausableTSOracle = (PausableTimestampOracle) injector.getInstance(TimestampOracle.class);
+ commitTable = injector.getInstance(CommitTable.class);
+
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception {
+
+ tsoServer.stopAndWait();
+ tsoServer = null;
+ TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 1000);
+
+ }
+
+ @BeforeMethod
+ public void beforeMethod() {
+ OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+ tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+
+ this.tsoClientConf = tsoClientConf;
+
+ }
+
+ @AfterMethod
+ public void afterMethod() {
+
+ pausableTSOracle.resume();
+
+ }
+
+ /**
+ * Test to ensure TSOClient timeouts are cancelled.
+ * At some point a bug was detected because the TSOClient timeouts were not cancelled, and as timestamp requests
+ * had no way to be correlated to timestamp responses, random requests were just timed out after a certain time.
+ * We send a lot of timestamp requests, and wait for them to complete.
+ * Ensure that the next request doesn't get hit by the timeouts of the previous
+ * requests. (i.e. make sure we cancel timeouts)
+ */
+ @Test(timeOut = 30_000)
+ public void testTimeoutsAreCancelled() throws Exception {
+
+ TSOClient client = TSOClient.newInstance(tsoClientConf);
+ int requestTimeoutInMs = 500;
+ int requestMaxRetries = 5;
+ LOG.info("Request timeout {} ms; Max retries {}", requestTimeoutInMs, requestMaxRetries);
+ Future<Long> f = null;
+ for (int i = 0; i < (requestMaxRetries * 10); i++) {
+ f = client.getNewStartTimestamp();
+ }
+ if (f != null) {
+ f.get();
+ }
+ pausableTSOracle.pause();
+ long msToSleep = ((long) (requestTimeoutInMs * 0.75));
+ LOG.info("Sleeping for {} ms", msToSleep);
+ TimeUnit.MILLISECONDS.sleep(msToSleep);
+ f = client.getNewStartTimestamp();
+ msToSleep = ((long) (requestTimeoutInMs * 0.9));
+ LOG.info("Sleeping for {} ms", msToSleep);
+ TimeUnit.MILLISECONDS.sleep(msToSleep);
+ LOG.info("Resuming");
+ pausableTSOracle.resume();
+ f.get();
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testCommitGetsServiceUnavailableExceptionWhenCommunicationFails() throws Exception {
+
+ OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
+ testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+ testTSOClientConf.setRequestMaxRetries(0);
+ TSOClient client = TSOClient.newInstance(testTSOClientConf);
+
+ List<Long> startTimestamps = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ startTimestamps.add(client.getNewStartTimestamp().get());
+ }
+
+ pausableTSOracle.pause();
+
+ List<Future<Long>> futures = new ArrayList<>();
+ for (long s : startTimestamps) {
+ futures.add(client.commit(s, Sets.<CellId>newHashSet()));
+ }
+ TSOClientAccessor.closeChannel(client);
+
+ for (Future<Long> f : futures) {
+ try {
+ f.get();
+ fail("Shouldn't be able to complete");
+ } catch (ExecutionException ee) {
+ assertTrue(ee.getCause() instanceof ServiceUnavailableException,
+ "Should be a service unavailable exception");
+ }
+ }
+ }
+
+ /**
+ * Test that if a client tries to make a request without handshaking, it will be disconnected.
+ */
+ @Test(timeOut = 30_000)
+ public void testHandshakeBetweenOldClientAndCurrentServer() throws Exception {
+
+ TSOClientRaw raw = new TSOClientRaw(TSO_SERVER_HOST, TSO_SERVER_PORT);
+
+ TSOProto.Request request = TSOProto.Request.newBuilder()
+ .setTimestampRequest(TSOProto.TimestampRequest.newBuilder().build())
+ .build();
+ raw.write(request);
+ try {
+ raw.getResponse().get();
+ fail("Channel should be closed");
+ } catch (ExecutionException ee) {
+ assertEquals(ee.getCause().getClass(), ConnectionException.class, "Should be channel closed exception");
+ }
+ raw.close();
+
+ }
+
+ // ----------------------------------------------------------------------------------------------------------------
+ // Test duplicate commits
+ // ----------------------------------------------------------------------------------------------------------------
+
+ /**
+ * This tests the case where messages arrive at the TSO out of order. This can happen in the case
+ * the channel get dropped and the retry is done in a new channel. However, the TSO will respond with
+ * aborted to the original message because the retry was already committed and it would be prohibitively
+ * expensive to check all non-retry requests to see if they are already committed. For this reason
+ * a client must ensure that if it is sending a retry due to a socket error, the previous channel
+ * must be entirely closed so that it will not actually receive the abort response. TCP guarantees
+ * that this doesn't happen in non-socket error cases.
+ *
+ */
+ @Test(timeOut = 30_000)
+ public void testOutOfOrderMessages() throws Exception {
+
+ TSOClient client = TSOClient.newInstance(tsoClientConf);
+ TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
+
+ long ts1 = client.getNewStartTimestamp().get();
+
+ TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
+ TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
+ assertFalse(response1.getCommitResponse().getAborted(), "Retry Transaction should commit");
+ assertTrue(response2.getCommitResponse().getAborted(), "Transaction should abort");
+ }
+
+ @Test(timeOut = 30_000)
+ public void testDuplicateCommitAborting() throws Exception {
+
+ TSOClient client = TSOClient.newInstance(tsoClientConf);
+ TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
+
+ long ts1 = client.getNewStartTimestamp().get();
+ long ts2 = client.getNewStartTimestamp().get();
+ client.commit(ts2, testWriteSet).get();
+
+ TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
+ TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
+ assertTrue(response1.getCommitResponse().getAborted(), "Transaction should abort");
+ assertTrue(response2.getCommitResponse().getAborted(), "Retry commit should abort");
+ }
+
+ @Test(timeOut = 30_000)
+ public void testDuplicateCommit() throws Exception {
+
+ TSOClient client = TSOClient.newInstance(tsoClientConf);
+ TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
+
+ long ts1 = client.getNewStartTimestamp().get();
+
+ TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
+ TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
+ assertEquals(response2.getCommitResponse().getCommitTimestamp(),
+ response1.getCommitResponse().getCommitTimestamp(),
+ "Commit timestamp should be the same");
+ }
+
+ // ----------------------------------------------------------------------------------------------------------------
+ // Test TSOClient retry behaviour
+ // ----------------------------------------------------------------------------------------------------------------
+
+ @Test(timeOut = 30_000)
+ public void testCommitCanSucceedWhenChannelDisconnected() throws Exception {
+
+ TSOClient client = TSOClient.newInstance(tsoClientConf);
+
+ long ts1 = client.getNewStartTimestamp().get();
+ pausableTSOracle.pause();
+ TSOFuture<Long> future = client.commit(ts1, testWriteSet);
+ TSOClientAccessor.closeChannel(client);
+ pausableTSOracle.resume();
+ future.get();
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testCommitCanSucceedWithMultipleTimeouts() throws Exception {
+
+ OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
+ testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+ testTSOClientConf.setRequestTimeoutInMs(100);
+ testTSOClientConf.setRequestMaxRetries(10000);
+ TSOClient client = TSOClient.newInstance(testTSOClientConf);
+
+ long ts1 = client.getNewStartTimestamp().get();
+ pausableTSOracle.pause();
+ TSOFuture<Long> future = client.commit(ts1, testWriteSet);
+ TimeUnit.SECONDS.sleep(1);
+ pausableTSOracle.resume();
+ future.get();
+ }
+
+ @Test(timeOut = 30_000)
+ public void testCommitFailWhenTSOIsDown() throws Exception {
+
+ OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
+ testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+ testTSOClientConf.setRequestTimeoutInMs(100);
+ testTSOClientConf.setRequestMaxRetries(10);
+ TSOClient client = TSOClient.newInstance(testTSOClientConf);
+
+ long ts1 = client.getNewStartTimestamp().get();
+ pausableTSOracle.pause();
+ TSOFuture<Long> future = client.commit(ts1, testWriteSet);
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ assertEquals(e.getCause().getClass(), ServiceUnavailableException.class,
+ "Should be a ServiceUnavailableExeption");
+ }
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testTimestampRequestSucceedWithMultipleTimeouts() throws Exception {
+
+ OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
+ testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+ testTSOClientConf.setRequestTimeoutInMs(100);
+ testTSOClientConf.setRequestMaxRetries(10000);
+ TSOClient client = TSOClient.newInstance(testTSOClientConf);
+
+ pausableTSOracle.pause();
+ Future<Long> future = client.getNewStartTimestamp();
+ TimeUnit.SECONDS.sleep(1);
+ pausableTSOracle.resume();
+ future.get();
+
+ }
+
+ // ----------------------------------------------------------------------------------------------------------------
+ // The next 3 tests are similar to the ones in TestRetryProcessor but checking the result on the TSOClient side
+ // (They exercise the communication protocol) TODO Remove???
+ // ----------------------------------------------------------------------------------------------------------------
+ @Test
+ public void testCommitTimestampPresentInCommitTableReturnsCommit() throws Exception {
+
+ TSOClient client = TSOClient.newInstance(tsoClientConf);
+ TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
+
+ long tx1ST = client.getNewStartTimestamp().get();
+
+ clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
+ TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
+ assertFalse(response.getCommitResponse().getAborted(), "Transaction should be committed");
+ assertFalse(response.getCommitResponse().getMakeHeuristicDecision());
+ assertEquals(response.getCommitResponse().getCommitTimestamp(), tx1ST + 1);
+ }
+
+ @Test
+ public void testInvalidCommitTimestampPresentInCommitTableReturnsAbort() throws Exception {
+
+ TSOClient client = TSOClient.newInstance(tsoClientConf);
+ TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
+
+ long tx1ST = client.getNewStartTimestamp().get();
+ // Invalidate the transaction
+ commitTable.getClient().tryInvalidateTransaction(tx1ST);
+
+ clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
+ TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
+ assertTrue(response.getCommitResponse().getAborted(), "Transaction should be aborted");
+ assertFalse(response.getCommitResponse().getMakeHeuristicDecision());
+ assertEquals(response.getCommitResponse().getCommitTimestamp(), 0);
+ }
+
+ @Test
+ public void testCommitTimestampNotPresentInCommitTableReturnsAnAbort() throws Exception {
+
+ TSOClient client = TSOClient.newInstance(tsoClientConf);
+ TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
+
+ long tx1ST = client.getNewStartTimestamp().get();
+
+ clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
+
+ // Simulate remove entry from the commit table before exercise retry
+ commitTable.getClient().completeTransaction(tx1ST);
+
+ TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
+ assertTrue(response.getCommitResponse().getAborted(), "Transaction should abort");
+ assertFalse(response.getCommitResponse().getMakeHeuristicDecision());
+ assertEquals(response.getCommitResponse().getCommitTimestamp(), 0);
+ }
+ // ----------------------------------------------------------------------------------------------------------------
+ // The previous 3 tests are similar to the ones in TestRetryProcessor but checking the result on the TSOClient side
+ // (They exercise the communication protocol) TODO Remove???
+ // ----------------------------------------------------------------------------------------------------------------
+
+ // ----------------------------------------------------------------------------------------------------------------
+ // Helper methods
+ // ----------------------------------------------------------------------------------------------------------------
+
+ private TSOProto.Request createRetryCommitRequest(long ts) {
+ return createCommitRequest(ts, true, testWriteSet);
+ }
+
+ private TSOProto.Request createCommitRequest(long ts, boolean retry, Set<CellId> writeSet) {
+ TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
+ TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder();
+ commitBuilder.setStartTimestamp(ts);
+ commitBuilder.setIsRetry(retry);
+ for (CellId cell : writeSet) {
+ commitBuilder.addCellId(cell.getCellId());
+ }
+ return builder.setCommitRequest(commitBuilder.build()).build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientResponseHandling.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientResponseHandling.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientResponseHandling.java
new file mode 100644
index 0000000..cf05a9a
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientResponseHandling.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso.client;
+
+import org.apache.omid.tso.ProgrammableTSOServer;
+import org.apache.omid.tso.ProgrammableTSOServer.AbortResponse;
+import org.apache.omid.tso.ProgrammableTSOServer.CommitResponse;
+import org.apache.omid.tso.ProgrammableTSOServer.TimestampResponse;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+
+import static org.testng.Assert.assertEquals;
+
+public class TestTSOClientResponseHandling {
+
+ private static final int TSO_PORT = 4321;
+ private static final long START_TS = 1L;
+ private static final long COMMIT_TS = 2L;
+
+ private ProgrammableTSOServer tsoServer = new ProgrammableTSOServer(TSO_PORT);
+ // Client under test
+ private TSOClient tsoClient;
+
+ @BeforeClass
+ public void configureAndCreateClient() throws IOException, InterruptedException {
+
+ OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+ tsoClientConf.setConnectionString("localhost:" + TSO_PORT);
+ tsoClient = TSOClient.newInstance(tsoClientConf);
+ }
+
+ @BeforeMethod
+ public void reset() {
+ tsoServer.cleanResponses();
+ }
+
+ @Test
+ public void testTimestampRequestReceivingASuccessfulResponse() throws Exception {
+ // test request timestamp response returns a timestamp
+
+ // Program the TSO to return an ad-hoc Timestamp response
+ tsoServer.queueResponse(new TimestampResponse(START_TS));
+
+ long startTS = tsoClient.getNewStartTimestamp().get();
+ assertEquals(startTS, START_TS);
+ }
+
+ @Test
+ public void testCommitRequestReceivingAnAbortResponse() throws Exception {
+ // test commit request which is aborted on the server side
+ // (e.g. due to conflicts with other transaction) throws an
+ // execution exception with an AbortException as a cause
+
+ // Program the TSO to return an Abort response
+ tsoServer.queueResponse(new AbortResponse(START_TS));
+
+ try {
+ tsoClient.commit(START_TS, Collections.<CellId>emptySet()).get();
+ } catch (ExecutionException ee) {
+ assertEquals(ee.getCause().getClass(), AbortException.class);
+ }
+ }
+
+ @Test
+ public void testCommitRequestReceivingASuccessfulResponse() throws Exception {
+ // test commit request which is successfully committed on the server
+ // side returns a commit timestamp
+
+ // Program the TSO to return an Commit response (with no required heuristic actions)
+ tsoServer.queueResponse(new CommitResponse(false, START_TS, COMMIT_TS));
+
+ long commitTS = tsoClient.commit(START_TS, Collections.<CellId>emptySet()).get();
+ assertEquals(commitTS, COMMIT_TS);
+ }
+
+ @Test
+ public void testCommitRequestReceivingAHeuristicResponse() throws Exception {
+ // test commit request which needs heuristic actions from the client
+ // throws an execution exception with a NewTSOException as a cause
+
+ // Program the TSO to return an Commit response requiring heuristic actions
+ tsoServer.queueResponse(new CommitResponse(true, START_TS, COMMIT_TS));
+ try {
+ tsoClient.commit(START_TS, Collections.<CellId>emptySet()).get();
+ } catch (ExecutionException ee) {
+ assertEquals(ee.getCause().getClass(), NewTSOException.class);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/client/TestUnconnectedTSOClient.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestUnconnectedTSOClient.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestUnconnectedTSOClient.java
new file mode 100644
index 0000000..883d45f
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestUnconnectedTSOClient.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso.client;
+
+import org.apache.omid.tso.util.DummyCellIdImpl;
+import org.apache.statemachine.StateMachine.FsmImpl;
+import org.slf4j.Logger;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.collect.Sets.newHashSet;
+import static org.slf4j.LoggerFactory.getLogger;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+/**
+ * Test the behavior of requests on a TSOClient component that is not connected to a TSO server.
+ */
+public class TestUnconnectedTSOClient {
+
+ private static final Logger LOG = getLogger(TestUnconnectedTSOClient.class);
+
+ private static final int TSO_RECONNECTION_DELAY_IN_SECS_FOR_TEST = 2;
+
+ @Test(timeOut = 30_000) // 30 secs
+ public void testRequestsDoneOnAnUnconnectedTSOClientAlwaysReturn() throws Exception {
+
+ OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+ tsoClientConf.setConnectionString("localhost:12345");
+ tsoClientConf.setReconnectionDelayInSecs(TSO_RECONNECTION_DELAY_IN_SECS_FOR_TEST);
+
+ // Component under test
+ TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
+
+ // Internal accessor to fsm
+ FsmImpl fsm = (FsmImpl) tsoClient.fsm;
+
+ assertEquals(fsm.getState().getClass(), TSOClient.DisconnectedState.class);
+
+ // Test requests to the 3 relevant methods in TSO client
+
+ try {
+ tsoClient.getNewStartTimestamp().get();
+ fail();
+ } catch (ExecutionException e) {
+ LOG.info("Exception expected");
+ assertEquals(e.getCause().getClass(), ConnectionException.class);
+ TimeUnit.SECONDS.sleep(TSO_RECONNECTION_DELAY_IN_SECS_FOR_TEST * 2);
+ assertEquals(fsm.getState().getClass(), TSOClient.DisconnectedState.class);
+ }
+
+ try {
+ tsoClient.commit(1, newHashSet(new DummyCellIdImpl(0xdeadbeefL))).get();
+ fail();
+ } catch (ExecutionException e) {
+ LOG.info("Exception expected");
+ assertEquals(e.getCause().getClass(), ConnectionException.class);
+ TimeUnit.SECONDS.sleep(TSO_RECONNECTION_DELAY_IN_SECS_FOR_TEST * 2);
+ assertEquals(fsm.getState().getClass(), TSOClient.DisconnectedState.class);
+ }
+
+ tsoClient.close().get();
+ LOG.info("No exception expected");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/tso-server/src/test/resources/log4j.properties b/tso-server/src/test/resources/log4j.properties
index 8273243..5f7911e 100644
--- a/tso-server/src/test/resources/log4j.properties
+++ b/tso-server/src/test/resources/log4j.properties
@@ -40,14 +40,13 @@ log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.apache.bookkeeper=FATAL
log4j.logger.org.apache.hadoop.hbase=ERROR
log4j.logger.org.apache.hadoop.ipc=ERROR
-
-log4j.logger.com.yahoo.omid=INFO
-#log4j.logger.com.yahoo.omid.regionserver.TransactionalRegionServer=TRACE
-#log4j.logger.com.yahoo.omid.TestBasicTransaction=TRACE
-#log4j.logger.com.yahoo.omid.client.TSOClient=TRACE
-#log4j.logger.com.yahoo.omid.client.TransactionState=TRACE
-#log4j.logger.com.yahoo.omid.OmidTestBase=TRACE
-#log4j.logger.com.yahoo.omid.tso.ThroughputMonitor=INFO
+log4j.logger.org.apache.omid=INFO
+#log4j.logger.org.apache.omid.regionserver.TransactionalRegionServer=TRACE
+#log4j.logger.org.apache.omid.TestBasicTransaction=TRACE
+#log4j.logger.org.apache.omid.client.TSOClient=TRACE
+#log4j.logger.org.apache.omid.client.TransactionState=TRACE
+#log4j.logger.org.apache.omid.OmidTestBase=TRACE
+#log4j.logger.org.apache.omid.tso.ThroughputMonitor=INFO
#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
# Make these two classes INFO-level. Make them DEBUG to see more zk debug.
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/resources/test-omid.yml
----------------------------------------------------------------------
diff --git a/tso-server/src/test/resources/test-omid.yml b/tso-server/src/test/resources/test-omid.yml
index 9db7bd6..4729bb1 100644
--- a/tso-server/src/test/resources/test-omid.yml
+++ b/tso-server/src/test/resources/test-omid.yml
@@ -9,13 +9,13 @@ maxBatchSize: 500
batchPersistTimeoutInMs: 100
networkIfaceName: eth1
-commitTableStoreModule: !!com.yahoo.omid.committable.hbase.DefaultHBaseCommitTableStorageModule
+commitTableStoreModule: !!org.apache.omid.committable.hbase.DefaultHBaseCommitTableStorageModule
tableName: "sieve_omid:OMID_TIMESTAMP_F"
-timestampStoreModule: !!com.yahoo.omid.timestamp.storage.DefaultHBaseTimestampStorageModule
+timestampStoreModule: !!org.apache.omid.timestamp.storage.DefaultHBaseTimestampStorageModule
tableName: "sieve_omid:OMID_COMMIT_TABLE_F"
familyName: "MAX_TIMESTAMP_F"
-leaseModule: !!com.yahoo.omid.tso.VoidLeaseManagementModule [ ]
+leaseModule: !!org.apache.omid.tso.VoidLeaseManagementModule [ ]
-metrics: !!com.yahoo.omid.metrics.NullMetricsProvider [ ]
\ No newline at end of file
+metrics: !!org.apache.omid.metrics.NullMetricsProvider [ ]
\ No newline at end of file