You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by yo...@apache.org on 2018/12/03 13:46:30 UTC
[35/50] [abbrv] incubator-omid git commit: OMID-90 Add omid low
latency mode
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/TestTSOLL.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTSOLL.java b/tso-server/src/test/java/org/apache/omid/tso/TestTSOLL.java
new file mode 100644
index 0000000..17c70f0
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestTSOLL.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import org.apache.omid.TestUtils;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.proto.TSOProto;
+import org.apache.omid.tso.client.CellId;
+import org.apache.omid.tso.client.TSOClient;
+import org.apache.omid.tso.client.TSOClientOneShot;
+import org.apache.omid.tso.util.DummyCellIdImpl;
+import org.testng.annotations.Test;
+import org.apache.omid.tso.client.OmidClientConfiguration;
+import org.mockito.Mock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import java.util.Set;
+import static org.testng.Assert.assertTrue;
+
+public class TestTSOLL {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestTSOLL.class);
+
+ private static final String TSO_SERVER_HOST = "localhost";
+ private static final int TSO_SERVER_PORT = 1234;
+
+
+ private OmidClientConfiguration tsoClientConf;
+
+ // Required infrastructure for TSOClient test
+ private TSOServer tsoServer;
+ private PausableTimestampOracle pausableTSOracle;
+ private CommitTable commitTable;
+
+ private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL);
+ private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL);
+
+ private final static Set<CellId> testWriteSet = Sets.newHashSet(c1, c2);
+
+ @Mock
+ ReplyProcessor replyProcessor;
+
+ @BeforeMethod
+ public void beforeMethod() throws Exception {
+
+ TSOServerConfig tsoConfig = new TSOServerConfig();
+ tsoConfig.setLowLatency(true);
+ tsoConfig.setConflictMapSize(1000);
+ tsoConfig.setPort(TSO_SERVER_PORT);
+ tsoConfig.setNumConcurrentCTWriters(2);
+ Module tsoServerMockModule = new TSOMockModule(tsoConfig);
+ Injector injector = Guice.createInjector(tsoServerMockModule);
+
+ LOG.info("==================================================================================================");
+ LOG.info("======================================= Init TSO Server ==========================================");
+ LOG.info("==================================================================================================");
+
+ tsoServer = injector.getInstance(TSOServer.class);
+ tsoServer.startAndWait();
+ TestUtils.waitForSocketListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 100);
+
+ LOG.info("==================================================================================================");
+ LOG.info("===================================== TSO Server Initialized =====================================");
+ LOG.info("==================================================================================================");
+
+ pausableTSOracle = (PausableTimestampOracle) injector.getInstance(TimestampOracle.class);
+ commitTable = injector.getInstance(CommitTable.class);
+
+ OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+ tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+
+ this.tsoClientConf = tsoClientConf;
+ commitTable = injector.getInstance(CommitTable.class);
+ replyProcessor = injector.getInstance(ReplyProcessor.class);
+ }
+
+ @AfterMethod
+ public void afterMethod() throws Exception {
+
+
+ tsoServer.stopAndWait();
+ tsoServer = null;
+ TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 1000);
+
+ pausableTSOracle.resume();
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testNoWriteToCommitTable() throws Exception {
+
+ TSOClient client = TSOClient.newInstance(tsoClientConf);
+ TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
+ long ts1 = client.getNewStartTimestamp().get();
+
+ TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
+ assertTrue(response1.getCommitResponse().hasCommitTimestamp());
+ Optional<CommitTable.CommitTimestamp> cts = commitTable.getClient().getCommitTimestamp(ts1).get();
+
+ assertTrue(cts.isPresent() == false);
+ }
+
+ private TSOProto.Request createCommitRequest(long ts, boolean retry, Set<CellId> writeSet) {
+ TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
+ TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder();
+ commitBuilder.setStartTimestamp(ts);
+ commitBuilder.setIsRetry(retry);
+ for (CellId cell : writeSet) {
+ commitBuilder.addCellId(cell.getCellId());
+ }
+ return builder.setCommitRequest(commitBuilder.build()).build();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
index c4c9c61..3ae8968 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
@@ -181,21 +181,29 @@ public class TestIntegrationOfTSOClientServerBasicFunctionality {
@Test(timeOut = 30_000)
public void testCommitWritesToCommitTable() throws Exception {
+
long startTsForTx1 = tsoClient.getNewStartTimestamp().get();
long startTsForTx2 = tsoClient.getNewStartTimestamp().get();
assertTrue(startTsForTx2 > startTsForTx1, "Start TS should grow");
- assertFalse(commitTableClient.getCommitTimestamp(startTsForTx1).get().isPresent(),
- "Commit TS for Tx1 shouldn't appear in Commit Table");
+ if (!tsoClient.isLowLatency())
+ assertFalse(commitTableClient.getCommitTimestamp(startTsForTx1).get().isPresent(),
+ "Commit TS for Tx1 shouldn't appear in Commit Table");
long commitTsForTx1 = tsoClient.commit(startTsForTx1, Sets.newHashSet(c1)).get();
assertTrue(commitTsForTx1 > startTsForTx1, "Commit TS should be higher than Start TS for the same tx");
- Long commitTs1InCommitTable = commitTableClient.getCommitTimestamp(startTsForTx1).get().get().getValue();
- assertNotNull(commitTs1InCommitTable, "Tx is committed, should return as such from Commit Table");
- assertEquals(commitTsForTx1, (long) commitTs1InCommitTable,
- "getCommitTimestamp() & commit() should report same Commit TS value for same tx");
- assertTrue(commitTs1InCommitTable > startTsForTx2, "Commit TS should be higher than tx's Start TS");
+ if (!tsoClient.isLowLatency()) {
+ Long commitTs1InCommitTable = commitTableClient.getCommitTimestamp(startTsForTx1).get().get().getValue();
+ assertNotNull(commitTs1InCommitTable, "Tx is committed, should return as such from Commit Table");
+ assertEquals(commitTsForTx1, (long) commitTs1InCommitTable,
+ "getCommitTimestamp() & commit() should report same Commit TS value for same tx");
+ assertTrue(commitTs1InCommitTable > startTsForTx2, "Commit TS should be higher than tx's Start TS");
+ } else {
+ assertTrue(commitTsForTx1 > startTsForTx2, "Commit TS should be higher than tx's Start TS");
+ }
+
+
}
@Test(timeOut = 30_000)
@@ -265,7 +273,7 @@ public class TestIntegrationOfTSOClientServerBasicFunctionality {
long startTsTx2Client1 = tsoClient.getNewStartTimestamp().get();
long startTsTx3Client1 = tsoClient.getNewStartTimestamp().get();
- tsoClient.commit(startTsTx1Client1, Sets.newHashSet(c1)).get();
+ Long commitTSTx1 = tsoClient.commit(startTsTx1Client1, Sets.newHashSet(c1)).get();
try {
tsoClient.commit(startTsTx3Client1, Sets.newHashSet(c1, c2)).get();
Assert.fail("Second commit should fail as conflicts with the previous concurrent one");
@@ -275,7 +283,8 @@ public class TestIntegrationOfTSOClientServerBasicFunctionality {
long startTsTx4Client2 = justAnotherTSOClient.getNewStartTimestamp().get();
assertFalse(commitTableClient.getCommitTimestamp(startTsTx3Client1).get().isPresent(), "Tx3 didn't commit");
- long commitTSTx1 = commitTableClient.getCommitTimestamp(startTsTx1Client1).get().get().getValue();
+ if (!tsoClient.isLowLatency())
+ commitTSTx1 = commitTableClient.getCommitTimestamp(startTsTx1Client1).get().get().getValue();
assertTrue(commitTSTx1 > startTsTx2Client1, "Tx1 committed after Tx2 started");
assertTrue(commitTSTx1 < startTsTx4Client2, "Tx1 committed before Tx4 started on the other TSO Client");
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
index a2da056..080c23e 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
@@ -257,9 +257,13 @@ public class TestTSOClientRequestAndResponseBehaviours {
TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
- assertEquals(response2.getCommitResponse().getCommitTimestamp(),
- response1.getCommitResponse().getCommitTimestamp(),
- "Commit timestamp should be the same");
+ if (client.isLowLatency()) {
+ assertTrue(response1.hasCommitResponse());
+ assertTrue(response2.getCommitResponse().getAborted());
+ } else
+ assertEquals(response2.getCommitResponse().getCommitTimestamp(),
+ response1.getCommitResponse().getCommitTimestamp(),
+ "Commit timestamp should be the same");
}
// ----------------------------------------------------------------------------------------------------------------
@@ -270,8 +274,9 @@ public class TestTSOClientRequestAndResponseBehaviours {
public void testCommitCanSucceedWhenChannelDisconnected() throws Exception {
TSOClient client = TSOClient.newInstance(tsoClientConf);
-
long ts1 = client.getNewStartTimestamp().get();
+ if(client.isLowLatency())
+ return;
pausableTSOracle.pause();
TSOFuture<Long> future = client.commit(ts1, testWriteSet);
TSOClientAccessor.closeChannel(client);
@@ -349,8 +354,13 @@ public class TestTSOClientRequestAndResponseBehaviours {
clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
- assertFalse(response.getCommitResponse().getAborted(), "Transaction should be committed");
- assertEquals(response.getCommitResponse().getCommitTimestamp(), tx1ST + AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN);
+ if (client.isLowLatency())
+ assertTrue(response.getCommitResponse().getAborted(), "Transaction should be aborted");
+ else {
+ assertFalse(response.getCommitResponse().getAborted(), "Transaction should be committed");
+ assertEquals(response.getCommitResponse().getCommitTimestamp(),
+ tx1ST + AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN);
+ }
}
@Test(timeOut = 30_000)