You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by yo...@apache.org on 2018/12/03 13:46:30 UTC

[35/50] [abbrv] incubator-omid git commit: OMID-90 Add omid low latency mode

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/TestTSOLL.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTSOLL.java b/tso-server/src/test/java/org/apache/omid/tso/TestTSOLL.java
new file mode 100644
index 0000000..17c70f0
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestTSOLL.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import org.apache.omid.TestUtils;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.proto.TSOProto;
+import org.apache.omid.tso.client.CellId;
+import org.apache.omid.tso.client.TSOClient;
+import org.apache.omid.tso.client.TSOClientOneShot;
+import org.apache.omid.tso.util.DummyCellIdImpl;
+import org.testng.annotations.Test;
+import org.apache.omid.tso.client.OmidClientConfiguration;
+import org.mockito.Mock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import java.util.Set;
+import static org.testng.Assert.assertTrue;
+
+public class TestTSOLL {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestTSOLL.class);
+
+    private static final String TSO_SERVER_HOST = "localhost";
+    private static final int TSO_SERVER_PORT = 1234;
+
+
+    private OmidClientConfiguration tsoClientConf;
+
+    // Required infrastructure for TSOClient test
+    private TSOServer tsoServer;
+    private PausableTimestampOracle pausableTSOracle;
+    private CommitTable commitTable;
+
+    private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL);
+    private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL);
+
+    private final static Set<CellId> testWriteSet = Sets.newHashSet(c1, c2);
+
+    @Mock
+    ReplyProcessor replyProcessor;
+
+    @BeforeMethod
+    public void beforeMethod() throws Exception {
+
+        TSOServerConfig tsoConfig = new TSOServerConfig();
+        tsoConfig.setLowLatency(true);
+        tsoConfig.setConflictMapSize(1000);
+        tsoConfig.setPort(TSO_SERVER_PORT);
+        tsoConfig.setNumConcurrentCTWriters(2);
+        Module tsoServerMockModule = new TSOMockModule(tsoConfig);
+        Injector injector = Guice.createInjector(tsoServerMockModule);
+
+        LOG.info("==================================================================================================");
+        LOG.info("======================================= Init TSO Server ==========================================");
+        LOG.info("==================================================================================================");
+
+        tsoServer = injector.getInstance(TSOServer.class);
+        tsoServer.startAndWait();
+        TestUtils.waitForSocketListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 100);
+
+        LOG.info("==================================================================================================");
+        LOG.info("===================================== TSO Server Initialized =====================================");
+        LOG.info("==================================================================================================");
+
+        pausableTSOracle = (PausableTimestampOracle) injector.getInstance(TimestampOracle.class);
+        commitTable = injector.getInstance(CommitTable.class);
+
+        OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+        tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+
+        this.tsoClientConf = tsoClientConf;
+        commitTable = injector.getInstance(CommitTable.class);
+        replyProcessor = injector.getInstance(ReplyProcessor.class);
+    }
+
+    @AfterMethod
+    public void afterMethod() throws Exception {
+
+
+        tsoServer.stopAndWait();
+        tsoServer = null;
+        TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 1000);
+
+        pausableTSOracle.resume();
+
+    }
+
+    @Test(timeOut = 30_000)
+    public void testNoWriteToCommitTable() throws Exception {
+
+        TSOClient client = TSOClient.newInstance(tsoClientConf);
+        TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
+        long ts1 = client.getNewStartTimestamp().get();
+
+        TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
+        assertTrue(response1.getCommitResponse().hasCommitTimestamp());
+        Optional<CommitTable.CommitTimestamp> cts = commitTable.getClient().getCommitTimestamp(ts1).get();
+
+        assertTrue(cts.isPresent() == false);
+    }
+
+    private TSOProto.Request createCommitRequest(long ts, boolean retry, Set<CellId> writeSet) {
+        TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
+        TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder();
+        commitBuilder.setStartTimestamp(ts);
+        commitBuilder.setIsRetry(retry);
+        for (CellId cell : writeSet) {
+            commitBuilder.addCellId(cell.getCellId());
+        }
+        return builder.setCommitRequest(commitBuilder.build()).build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
index c4c9c61..3ae8968 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
@@ -181,21 +181,29 @@ public class TestIntegrationOfTSOClientServerBasicFunctionality {
 
     @Test(timeOut = 30_000)
     public void testCommitWritesToCommitTable() throws Exception {
+
         long startTsForTx1 = tsoClient.getNewStartTimestamp().get();
         long startTsForTx2 = tsoClient.getNewStartTimestamp().get();
         assertTrue(startTsForTx2 > startTsForTx1, "Start TS should grow");
 
-        assertFalse(commitTableClient.getCommitTimestamp(startTsForTx1).get().isPresent(),
-                "Commit TS for Tx1 shouldn't appear in Commit Table");
+        if (!tsoClient.isLowLatency())
+            assertFalse(commitTableClient.getCommitTimestamp(startTsForTx1).get().isPresent(),
+                    "Commit TS for Tx1 shouldn't appear in Commit Table");
 
         long commitTsForTx1 = tsoClient.commit(startTsForTx1, Sets.newHashSet(c1)).get();
         assertTrue(commitTsForTx1 > startTsForTx1, "Commit TS should be higher than Start TS for the same tx");
 
-        Long commitTs1InCommitTable = commitTableClient.getCommitTimestamp(startTsForTx1).get().get().getValue();
-        assertNotNull(commitTs1InCommitTable, "Tx is committed, should return as such from Commit Table");
-        assertEquals(commitTsForTx1, (long) commitTs1InCommitTable,
-                "getCommitTimestamp() & commit() should report same Commit TS value for same tx");
-        assertTrue(commitTs1InCommitTable > startTsForTx2, "Commit TS should be higher than tx's Start TS");
+        if (!tsoClient.isLowLatency()) {
+            Long commitTs1InCommitTable = commitTableClient.getCommitTimestamp(startTsForTx1).get().get().getValue();
+            assertNotNull(commitTs1InCommitTable, "Tx is committed, should return as such from Commit Table");
+            assertEquals(commitTsForTx1, (long) commitTs1InCommitTable,
+                    "getCommitTimestamp() & commit() should report same Commit TS value for same tx");
+            assertTrue(commitTs1InCommitTable > startTsForTx2, "Commit TS should be higher than tx's Start TS");
+        } else {
+            assertTrue(commitTsForTx1 > startTsForTx2, "Commit TS should be higher than tx's Start TS");
+        }
+
+
     }
 
     @Test(timeOut = 30_000)
@@ -265,7 +273,7 @@ public class TestIntegrationOfTSOClientServerBasicFunctionality {
         long startTsTx2Client1 = tsoClient.getNewStartTimestamp().get();
         long startTsTx3Client1 = tsoClient.getNewStartTimestamp().get();
 
-        tsoClient.commit(startTsTx1Client1, Sets.newHashSet(c1)).get();
+        Long commitTSTx1 = tsoClient.commit(startTsTx1Client1, Sets.newHashSet(c1)).get();
         try {
             tsoClient.commit(startTsTx3Client1, Sets.newHashSet(c1, c2)).get();
             Assert.fail("Second commit should fail as conflicts with the previous concurrent one");
@@ -275,7 +283,8 @@ public class TestIntegrationOfTSOClientServerBasicFunctionality {
         long startTsTx4Client2 = justAnotherTSOClient.getNewStartTimestamp().get();
 
         assertFalse(commitTableClient.getCommitTimestamp(startTsTx3Client1).get().isPresent(), "Tx3 didn't commit");
-        long commitTSTx1 = commitTableClient.getCommitTimestamp(startTsTx1Client1).get().get().getValue();
+        if (!tsoClient.isLowLatency())
+            commitTSTx1 = commitTableClient.getCommitTimestamp(startTsTx1Client1).get().get().getValue();
         assertTrue(commitTSTx1 > startTsTx2Client1, "Tx1 committed after Tx2 started");
         assertTrue(commitTSTx1 < startTsTx4Client2, "Tx1 committed before Tx4 started on the other TSO Client");
     }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
index a2da056..080c23e 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
@@ -257,9 +257,13 @@ public class TestTSOClientRequestAndResponseBehaviours {
 
         TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
         TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
-        assertEquals(response2.getCommitResponse().getCommitTimestamp(),
-                     response1.getCommitResponse().getCommitTimestamp(),
-                     "Commit timestamp should be the same");
+        if (client.isLowLatency()) {
+            assertTrue(response1.hasCommitResponse());
+            assertTrue(response2.getCommitResponse().getAborted());
+        } else
+            assertEquals(response2.getCommitResponse().getCommitTimestamp(),
+                    response1.getCommitResponse().getCommitTimestamp(),
+                    "Commit timestamp should be the same");
     }
 
     // ----------------------------------------------------------------------------------------------------------------
@@ -270,8 +274,9 @@ public class TestTSOClientRequestAndResponseBehaviours {
     public void testCommitCanSucceedWhenChannelDisconnected() throws Exception {
 
         TSOClient client = TSOClient.newInstance(tsoClientConf);
-
         long ts1 = client.getNewStartTimestamp().get();
+        if(client.isLowLatency())
+            return;
         pausableTSOracle.pause();
         TSOFuture<Long> future = client.commit(ts1, testWriteSet);
         TSOClientAccessor.closeChannel(client);
@@ -349,8 +354,13 @@ public class TestTSOClientRequestAndResponseBehaviours {
 
         clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
         TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
-        assertFalse(response.getCommitResponse().getAborted(), "Transaction should be committed");
-        assertEquals(response.getCommitResponse().getCommitTimestamp(), tx1ST + AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN);
+        if (client.isLowLatency())
+            assertTrue(response.getCommitResponse().getAborted(), "Transaction should be aborted");
+        else {
+            assertFalse(response.getCommitResponse().getAborted(), "Transaction should be committed");
+            assertEquals(response.getCommitResponse().getCommitTimestamp(),
+                    tx1ST + AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN);
+        }
     }
 
     @Test(timeOut = 30_000)