You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by fp...@apache.org on 2016/05/11 18:06:17 UTC
[10/50] [abbrv] incubator-omid git commit: Remove heuristic decission
Remove heuristic decission
When in HA mode and the TSO loses the mastership, the instance commit suicice
Change-Id: I63b550da440986f76a9c6243df3778a71f1459e6
Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/55e6644f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/55e6644f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/55e6644f
Branch: refs/heads/master
Commit: 55e6644fe2f102465c5e69751c9f72aac6fc8e9d
Parents: 811ec48
Author: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Authored: Mon Apr 25 13:49:05 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Tue Apr 26 17:08:45 2016 -0700
----------------------------------------------------------------------
common/src/main/proto/TSOProto.proto | 1 -
.../apache/omid/transaction/TestTSOModule.java | 4 +-
.../omid/transaction/TestTxMgrFailover.java | 189 +------------------
.../transaction/AbstractTransactionManager.java | 7 +-
.../apache/omid/tso/client/NewTSOException.java | 27 ---
.../org/apache/omid/tso/client/TSOClient.java | 9 +-
.../java/org/apache/omid/tso/MockPanicker.java | 2 +-
.../omid/tso/PersistenceProcessorImpl.java | 48 ++---
.../org/apache/omid/tso/ReplyProcessor.java | 8 +-
.../org/apache/omid/tso/ReplyProcessorImpl.java | 26 +--
.../org/apache/omid/tso/RetryProcessorImpl.java | 2 +-
.../omid/tso/RuntimeExceptionPanicker.java | 2 +-
.../apache/omid/tso/ProgrammableTSOServer.java | 14 +-
.../java/org/apache/omid/tso/TestBatch.java | 39 +---
.../org/apache/omid/tso/TestLeaseManager.java | 5 +-
.../omid/tso/TestPersistenceProcessor.java | 112 ++++++++---
.../org/apache/omid/tso/TestRetryProcessor.java | 2 +-
...stTSOClientRequestAndResponseBehaviours.java | 3 -
.../client/TestTSOClientResponseHandling.java | 17 +-
19 files changed, 140 insertions(+), 377 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/55e6644f/common/src/main/proto/TSOProto.proto
----------------------------------------------------------------------
diff --git a/common/src/main/proto/TSOProto.proto b/common/src/main/proto/TSOProto.proto
index 43987d8..749beaa 100644
--- a/common/src/main/proto/TSOProto.proto
+++ b/common/src/main/proto/TSOProto.proto
@@ -49,7 +49,6 @@ message CommitResponse {
optional bool aborted = 1;
optional int64 startTimestamp = 2;
optional int64 commitTimestamp = 3;
- optional bool makeHeuristicDecision = 4 [default = false];
}
message Capabilities {
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/55e6644f/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
index 4c38efa..2d4479a 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
@@ -26,7 +26,7 @@ import org.apache.omid.metrics.NullMetricsProvider;
import org.apache.omid.timestamp.storage.HBaseTimestampStorage;
import org.apache.omid.timestamp.storage.TimestampStorage;
import org.apache.omid.tso.DisruptorModule;
-import org.apache.omid.tso.MockPanicker;
+import org.apache.omid.tso.RuntimeExceptionPanicker;
import org.apache.omid.tso.NetworkInterfaceUtils;
import org.apache.omid.tso.Panicker;
import org.apache.omid.tso.PausableTimestampOracle;
@@ -64,7 +64,7 @@ class TestTSOModule extends AbstractModule {
bind(CommitTable.class).to(HBaseCommitTable.class).in(Singleton.class);
bind(TimestampStorage.class).to(HBaseTimestampStorage.class).in(Singleton.class);
bind(TimestampOracle.class).to(PausableTimestampOracle.class).in(Singleton.class);
- bind(Panicker.class).to(MockPanicker.class).in(Singleton.class);
+ bind(Panicker.class).to(RuntimeExceptionPanicker.class).in(Singleton.class);
// Disruptor setup
install(new DisruptorModule());
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/55e6644f/hbase-client/src/test/java/org/apache/omid/transaction/TestTxMgrFailover.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestTxMgrFailover.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestTxMgrFailover.java
index f37808d..d507c24 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestTxMgrFailover.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestTxMgrFailover.java
@@ -17,15 +17,6 @@
*/
package org.apache.omid.transaction;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.SettableFuture;
-import org.apache.omid.TestUtils;
-import org.apache.omid.committable.CommitTable;
-import org.apache.omid.committable.CommitTable.CommitTimestamp;
-import org.apache.omid.committable.InMemoryCommitTable;
-import org.apache.omid.transaction.Transaction.Status;
-import org.apache.omid.tso.ProgrammableTSOServer;
-import org.apache.omid.tso.client.TSOClient;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
@@ -34,6 +25,12 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.omid.TestUtils;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.committable.InMemoryCommitTable;
+import org.apache.omid.transaction.Transaction.Status;
+import org.apache.omid.tso.ProgrammableTSOServer;
+import org.apache.omid.tso.client.TSOClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.BeforeClass;
@@ -43,13 +40,9 @@ import org.testng.annotations.Test;
import javax.annotation.Nullable;
import java.io.IOException;
-import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.COMMIT_TABLE;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@Test(groups = "sharedHBase")
@@ -71,7 +64,6 @@ public class TestTxMgrFailover extends OmidTestBase {
private InMemoryCommitTable commitTable;
private CommitTable.Client commitTableClient;
- private CommitTable.Writer commitTableWriter;
// Allows to prepare the required responses to client request operations
private ProgrammableTSOServer tso;
@@ -93,7 +85,6 @@ public class TestTxMgrFailover extends OmidTestBase {
commitTable = new InMemoryCommitTable(); // Use an in-memory commit table to speed up tests
commitTableClient = spy(commitTable.getClient());
- commitTableWriter = spy(commitTable.getWriter());
HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
@@ -140,174 +131,6 @@ public class TestTxMgrFailover extends OmidTestBase {
}
- @Test(timeOut = 30_000)
- public void testClientReceivesSuccessfulCommitForNonInvalidatedTxCommittedByPreviousTSO() throws Exception {
-
- // Program the TSO to return an ad-hoc Timestamp and an commit response with heuristic actions
- tso.queueResponse(new ProgrammableTSOServer.TimestampResponse(TX1_ST));
- tso.queueResponse(new ProgrammableTSOServer.CommitResponse(true, TX1_ST, TX1_CT));
- // Simulate that tx1 was committed by writing to commit table
- commitTableWriter.addCommittedTransaction(TX1_ST, TX1_CT);
- commitTableWriter.flush();
- assertEquals(commitTable.countElements(), 1, "Rows should be 1!");
-
- try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
- HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
- assertEquals(tx1.getStartTimestamp(), TX1_ST);
- Put put = new Put(row1);
- put.add(TEST_FAMILY.getBytes(), qualifier, data1);
- txTable.put(tx1, put);
- // Should succeed
- tm.commit(tx1);
-
- // Check transaction status
- assertEquals(tx1.getStatus(), Status.COMMITTED);
- assertEquals(tx1.getCommitTimestamp(), TX1_CT);
- // Check the cleanup process did its job and the committed data is there
- // Note that now we do not clean up the commit table when exercising the heuristic actions
- assertEquals(commitTable.countElements(), 1,
- "Rows should be 1! We don't have to clean CT in this case");
- Optional<CommitTimestamp>
- optionalCT =
- tm.commitTableClient.getCommitTimestamp(TX1_ST).get();
- assertTrue(optionalCT.isPresent());
- checkOperationSuccessOnCell(KeyValue.Type.Put, data1, TEST_TABLE.getBytes(), row1, TEST_FAMILY.getBytes(),
- qualifier);
- }
-
- }
-
- @Test(timeOut = 30_000)
- public void testClientReceivesRollbackExceptionForInvalidatedTxCommittedByPreviousTSO() throws Exception {
-
- // Program the TSO to return an ad-hoc Timestamp and a commit response with heuristic actions
- tso.queueResponse(new ProgrammableTSOServer.TimestampResponse(TX1_ST));
- tso.queueResponse(new ProgrammableTSOServer.CommitResponse(true, TX1_ST, TX1_CT));
- // Simulate that tx1 was committed by writing to commit table but was later invalidated
- commitTableClient.tryInvalidateTransaction(TX1_ST);
- assertEquals(commitTable.countElements(), 1, "Rows should be 1!");
-
- executeTxAndCheckRollback();
-
- }
-
- @Test(timeOut = 30_000)
- public void testClientReceivesNotificationOfANewTSOCanInvalidateTransaction() throws Exception {
-
- // Program the TSO to return an ad-hoc Timestamp and a commit response with heuristic actions
- tso.queueResponse(new ProgrammableTSOServer.TimestampResponse(TX1_ST));
- tso.queueResponse(new ProgrammableTSOServer.CommitResponse(true, TX1_ST, TX1_CT));
-
- assertEquals(commitTable.countElements(), 0, "Rows should be 0!");
-
- executeTxAndCheckRollback();
-
- }
-
- private void executeTxAndCheckRollback() throws IOException, InterruptedException, java.util.concurrent.ExecutionException {
- try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
- HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
- assertEquals(tx1.getStartTimestamp(), TX1_ST);
- Put put = new Put(row1);
- put.add(TEST_FAMILY.getBytes(), qualifier, data1);
- txTable.put(tx1, put);
- try {
- tm.commit(tx1);
- fail();
- } catch (RollbackException e) {
- // Expected
- }
-
- // Check transaction status
- assertEquals(tx1.getStatus(), Status.ROLLEDBACK);
- assertEquals(tx1.getCommitTimestamp(), 0);
- // Check the cleanup process did its job and the uncommitted data is NOT there
- assertEquals(commitTable.countElements(), 1, "Rows should be 1! Dirty data should be there");
- Optional<CommitTimestamp>
- optionalCT =
- tm.commitTableClient.getCommitTimestamp(TX1_ST).get();
- assertTrue(optionalCT.isPresent());
- assertFalse(optionalCT.get().isValid());
- checkOperationSuccessOnCell(KeyValue.Type.Delete, null, TEST_TABLE.getBytes(), row1, TEST_FAMILY.getBytes(),
- qualifier);
- }
- }
-
- @Test(timeOut = 30_000)
- public void testClientSuccessfullyCommitsWhenReceivingNotificationOfANewTSOAandCANTInvalidateTransaction()
- throws Exception {
-
- // Program the TSO to return an ad-hoc Timestamp and a commit response with heuristic actions
- tso.queueResponse(new ProgrammableTSOServer.TimestampResponse(TX1_ST));
- tso.queueResponse(new ProgrammableTSOServer.CommitResponse(true, TX1_ST, TX1_CT));
-
- // Simulate that the original TSO was able to add the tx to commit table in the meantime
- commitTableWriter.addCommittedTransaction(TX1_ST, TX1_CT);
- commitTableWriter.flush();
- assertEquals(commitTable.countElements(), 1, "Rows should be 1!");
- SettableFuture<Optional<CommitTimestamp>> f1 = SettableFuture.create();
- f1.set(Optional.<CommitTimestamp>absent());
- SettableFuture<Optional<CommitTimestamp>> f2 = SettableFuture.create();
- f2.set(Optional.of(new CommitTimestamp(COMMIT_TABLE, TX1_CT, true)));
- doReturn(f1).doReturn(f2).when(commitTableClient).getCommitTimestamp(TX1_ST);
-
- try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
- HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
- assertEquals(tx1.getStartTimestamp(), TX1_ST);
- Put put = new Put(row1);
- put.add(TEST_FAMILY.getBytes(), qualifier, data1);
- txTable.put(tx1, put);
-
- tm.commit(tx1);
-
- // Check transaction status
- assertEquals(tx1.getStatus(), Status.COMMITTED);
- assertEquals(tx1.getCommitTimestamp(), TX1_CT);
- // Check the cleanup process did its job and the committed data is there
- // Note that now we do not clean up the commit table when exercising the heuristic actions
- assertEquals(commitTable.countElements(), 1,
- "Rows should be 1! We don't have to clean CT in this case");
- checkOperationSuccessOnCell(KeyValue.Type.Put, data1, TEST_TABLE.getBytes(), row1, TEST_FAMILY.getBytes(),
- qualifier);
- }
-
- }
-
- @Test(timeOut = 30_000)
- public void testClientReceivesATransactionExceptionWhenReceivingNotificationOfANewTSOAndCANTInvalidateTransactionAndCTCheckIsUnsuccessful()
- throws Exception {
-
- // Program the TSO to return an ad-hoc Timestamp and a commit response with heuristic actions
- tso.queueResponse(new ProgrammableTSOServer.TimestampResponse(TX1_ST));
- tso.queueResponse(new ProgrammableTSOServer.CommitResponse(true, TX1_ST, TX1_CT));
-
- // Simulate that the original TSO was able to add the tx to commit table in the meantime
- SettableFuture<Boolean> f = SettableFuture.create();
- f.set(false);
- doReturn(f).when(commitTableClient).tryInvalidateTransaction(TX1_ST);
-
- assertEquals(commitTable.countElements(), 0, "Rows should be 0!");
-
- try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
- HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
- assertEquals(tx1.getStartTimestamp(), TX1_ST);
- Put put = new Put(row1);
- put.add(TEST_FAMILY.getBytes(), qualifier, data1);
- txTable.put(tx1, put);
- try {
- tm.commit(tx1);
- fail();
- } catch (TransactionException e) {
- // Expected but is not good because we're not able to determine the tx outcome
- }
-
- // Check transaction status
- assertEquals(tx1.getStatus(), Status.RUNNING);
- assertEquals(tx1.getCommitTimestamp(), 0);
- }
-
- }
-
// ----------------------------------------------------------------------------------------------------------------
// Helper methods
// ----------------------------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/55e6644f/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
index 1157acb..4245917 100644
--- a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
+++ b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
@@ -29,7 +29,6 @@ import org.apache.omid.transaction.Transaction.Status;
import org.apache.omid.tso.client.AbortException;
import org.apache.omid.tso.client.CellId;
import org.apache.omid.tso.client.ConnectionException;
-import org.apache.omid.tso.client.NewTSOException;
import org.apache.omid.tso.client.ServiceUnavailableException;
import org.apache.omid.tso.client.TSOClient;
import org.slf4j.Logger;
@@ -417,11 +416,7 @@ public abstract class AbstractTransactionManager implements TransactionManager {
throw new RollbackException("Conflicts detected in tx writeset", e.getCause());
}
- if (e.getCause() instanceof ServiceUnavailableException
- ||
- e.getCause() instanceof NewTSOException
- ||
- e.getCause() instanceof ConnectionException) {
+ if (e.getCause() instanceof ServiceUnavailableException || e.getCause() instanceof ConnectionException) {
errorTxsCounter.inc();
try {
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/55e6644f/transaction-client/src/main/java/org/apache/omid/tso/client/NewTSOException.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/NewTSOException.java b/transaction-client/src/main/java/org/apache/omid/tso/client/NewTSOException.java
deleted file mode 100644
index 023257e..0000000
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/NewTSOException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.omid.tso.client;
-
-/**
- * Thrown when a new TSO has been detected
- */
-public class NewTSOException extends Exception {
-
- private static final long serialVersionUID = -3250655858200759321L;
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/55e6644f/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
index 260df53..67260fa 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
@@ -691,14 +691,7 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
if (resp.getCommitResponse().getAborted()) {
e.getRequest().error(new AbortException());
} else {
- // Check if the commit response received implies heuristic
- // actions during commit (because there's a new TSO master
- // replica) and inform the caller (e.g. the TxMgr) about it
- if (resp.getCommitResponse().getMakeHeuristicDecision()) {
- e.getRequest().error(new NewTSOException());
- } else {
- e.getRequest().success(resp.getCommitResponse().getCommitTimestamp());
- }
+ e.getRequest().success(resp.getCommitResponse().getCommitTimestamp());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/55e6644f/tso-server/src/main/java/org/apache/omid/tso/MockPanicker.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/MockPanicker.java b/tso-server/src/main/java/org/apache/omid/tso/MockPanicker.java
index fdf8476..c5278e8 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/MockPanicker.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/MockPanicker.java
@@ -26,7 +26,7 @@ public class MockPanicker implements Panicker {
@Override
public void panic(String reason) {
- LOG.error("PANICKING: {}", reason);
+ panic(reason, new Throwable("Mock Panicker"));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/55e6644f/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index ca0862f..bc1d43c 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -202,36 +202,29 @@ class PersistenceProcessorImpl
}
synchronized private void flush() {
- lastFlush.set(System.nanoTime());
- boolean areWeStillMaster = true;
- if (!leaseManager.stillInLeasePeriod()) {
- // The master TSO replica has changed, so we must inform the
- // clients about it when sending the replies and avoid flushing
- // the current batch of TXs
- areWeStillMaster = false;
- // We need also to clear the data in the buffer
- writer.clearWriteBuffer();
- LOG.trace("Replica {} lost mastership before flushing data", tsoHostAndPort);
- } else {
+ if (batch.getNumEvents() > 0) {
+ lastFlush.set(System.nanoTime());
+ commitSuicideIfNotMaster();
try {
writer.flush();
+ batchSizeHistogram.update(batch.getNumEvents());
} catch (IOException e) {
- panicker.panic("Error persisting commit batch", e.getCause());
- }
- batchSizeHistogram.update(batch.getNumEvents());
- if (!leaseManager.stillInLeasePeriod()) {
- // If after flushing this TSO server is not the master
- // replica we need inform the client about it
- areWeStillMaster = false;
- LOG.warn("Replica {} lost mastership after flushing data", tsoHostAndPort);
+ panicker.panic("Error persisting commit batch", e);
}
+ commitSuicideIfNotMaster(); // TODO Here, we can return the client responses before committing suicide
+ flushTimer.update((System.nanoTime() - lastFlush.get()));
+ batch.sendRepliesAndReset(reply, retryProc);
}
- flushTimer.update((System.nanoTime() - lastFlush.get()));
- batch.sendRepliesAndReset(reply, retryProc, areWeStillMaster);
}
+ private void commitSuicideIfNotMaster() {
+ if (!leaseManager.stillInLeasePeriod()) {
+ panicker.panic("Replica " + tsoHostAndPort + " lost mastership whilst flushing data. Committing suicide");
+ }
+ }
+
@Override
public void persistCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
long seq = persistRing.next();
@@ -273,6 +266,7 @@ class PersistenceProcessorImpl
Batch(int maxBatchSize) {
assert (maxBatchSize > 0);
+ LOG.info("Creating the Batch with {} elements", maxBatchSize);
this.maxBatchSize = maxBatchSize;
events = new PersistEvent[maxBatchSize];
numEvents = 0;
@@ -319,7 +313,7 @@ class PersistenceProcessorImpl
PersistEvent.makePersistTimestamp(e, startTimestamp, c, monCtx);
}
- void sendRepliesAndReset(ReplyProcessor reply, RetryProcessor retryProc, boolean isTSOInstanceMaster) {
+ void sendRepliesAndReset(ReplyProcessor reply, RetryProcessor retryProc) {
for (int i = 0; i < numEvents; i++) {
PersistEvent e = events[i];
switch (e.getType()) {
@@ -329,14 +323,8 @@ class PersistenceProcessorImpl
break;
case COMMIT:
e.getMonCtx().timerStop("commitPersistProcessor");
- if (isTSOInstanceMaster) {
- reply.commitResponse(false, e.getStartTimestamp(), e.getCommitTimestamp(), e.getChannel(),
- e.getMonCtx());
- } else {
- // The client will need to perform heuristic actions to determine the output
- reply.commitResponse(true, e.getStartTimestamp(), e.getCommitTimestamp(), e.getChannel(),
- e.getMonCtx());
- }
+
+ reply.commitResponse(e.getStartTimestamp(), e.getCommitTimestamp(), e.getChannel(), e.getMonCtx());
break;
case ABORT:
if (e.isRetry()) {
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/55e6644f/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
index c2de5f2..6aa6faa 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
@@ -21,12 +21,8 @@ import org.jboss.netty.channel.Channel;
interface ReplyProcessor {
/**
- * Informs the client about the outcome of the Tx it was trying to
- * commit. If the heuristic decision flat is enabled, the client
- * will need to do additional actions for learning the final outcome.
+ * Informs the client about the outcome of the Tx it was trying to commit.
*
- * @param makeHeuristicDecision
- * informs about whether heuristic actions are needed or not
* @param startTimestamp
* the start timestamp of the transaction (a.k.a. tx id)
* @param commitTimestamp
@@ -34,7 +30,7 @@ interface ReplyProcessor {
* @param channel
* the communication channed with the client
*/
- void commitResponse(boolean makeHeuristicDecision, long startTimestamp, long commitTimestamp, Channel channel, MonitoringContext monCtx);
+ void commitResponse(long startTimestamp, long commitTimestamp, Channel channel, MonitoringContext monCtx);
void abortResponse(long startTimestamp, Channel c, MonitoringContext monCtx);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/55e6644f/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index e6899cc..d87f9a7 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -72,12 +72,7 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyEvent>,
case COMMIT:
name = "commitReplyProcessor";
event.getMonCtx().timerStart(name);
- handleCommitResponse(false, event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel());
- break;
- case HEURISTIC_COMMIT:
- name = "commitReplyProcessor";
- event.getMonCtx().timerStart(name);
- handleCommitResponse(true, event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel());
+ handleCommitResponse(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel());
break;
case ABORT:
name = "abortReplyProcessor";
@@ -102,10 +97,10 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyEvent>,
}
@Override
- public void commitResponse(boolean makeHeuristicDecision, long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
+ public void commitResponse(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
long seq = replyRing.next();
ReplyEvent e = replyRing.get(seq);
- ReplyEvent.makeCommitResponse(makeHeuristicDecision, e, startTimestamp, commitTimestamp, c, monCtx);
+ ReplyEvent.makeCommitResponse(e, startTimestamp, commitTimestamp, c, monCtx);
replyRing.publish(seq);
}
@@ -125,12 +120,9 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyEvent>,
replyRing.publish(seq);
}
- void handleCommitResponse(boolean makeHeuristicDecision, long startTimestamp, long commitTimestamp, Channel c) {
+ void handleCommitResponse(long startTimestamp, long commitTimestamp, Channel c) {
TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
- if (makeHeuristicDecision) { // If the commit is ambiguous is due to a new master TSO
- commitBuilder.setMakeHeuristicDecision(true);
- }
commitBuilder.setAborted(false)
.setStartTimestamp(startTimestamp)
.setCommitTimestamp(commitTimestamp);
@@ -164,7 +156,7 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyEvent>,
public final static class ReplyEvent {
enum Type {
- TIMESTAMP, COMMIT, HEURISTIC_COMMIT, ABORT
+ TIMESTAMP, COMMIT, ABORT
}
private Type type = null;
@@ -201,14 +193,10 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyEvent>,
e.monCtx = monCtx;
}
- static void makeCommitResponse(boolean makeHeuristicDecision, ReplyEvent e, long startTimestamp,
+ static void makeCommitResponse(ReplyEvent e, long startTimestamp,
long commitTimestamp, Channel c, MonitoringContext monCtx) {
- if (makeHeuristicDecision) {
- e.type = Type.HEURISTIC_COMMIT;
- } else {
- e.type = Type.COMMIT;
- }
+ e.type = Type.COMMIT;
e.startTimestamp = startTimestamp;
e.commitTimestamp = commitTimestamp;
e.channel = c;
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/55e6644f/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
index 96d6570..7503d8a 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
@@ -110,7 +110,7 @@ class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>,
if (commitTimestamp.isPresent()) {
if (commitTimestamp.get().isValid()) {
LOG.trace("Valid commit TS found in Commit Table");
- replyProc.commitResponse(false, startTimestamp, commitTimestamp.get().getValue(),
+ replyProc.commitResponse(startTimestamp, commitTimestamp.get().getValue(),
event.getChannel(), event.getMonCtx());
} else {
LOG.trace("Invalid commit TS found in Commit Table");
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/55e6644f/tso-server/src/main/java/org/apache/omid/tso/RuntimeExceptionPanicker.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RuntimeExceptionPanicker.java b/tso-server/src/main/java/org/apache/omid/tso/RuntimeExceptionPanicker.java
index 657f0d2..9f4be88 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RuntimeExceptionPanicker.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RuntimeExceptionPanicker.java
@@ -17,7 +17,7 @@
*/
package org.apache.omid.tso;
-class RuntimeExceptionPanicker implements Panicker {
+public class RuntimeExceptionPanicker implements Panicker {
@Override
public void panic(String reason) {
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/55e6644f/tso-server/src/test/java/org/apache/omid/tso/ProgrammableTSOServer.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/ProgrammableTSOServer.java b/tso-server/src/test/java/org/apache/omid/tso/ProgrammableTSOServer.java
index 47f0e3e..e973494 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/ProgrammableTSOServer.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/ProgrammableTSOServer.java
@@ -137,10 +137,7 @@ public class ProgrammableTSOServer extends SimpleChannelHandler {
switch (resp.type) {
case COMMIT:
CommitResponse commitResp = (CommitResponse) resp;
- sendCommitResponse(commitResp.heuristicDecissionRequired,
- commitResp.startTS,
- commitResp.commitTS,
- channel);
+ sendCommitResponse(commitResp.startTS, commitResp.commitTS, channel);
break;
case ABORT:
AbortResponse abortResp = (AbortResponse) resp;
@@ -198,12 +195,9 @@ public class ProgrammableTSOServer extends SimpleChannelHandler {
c.write(builder.build());
}
- private void sendCommitResponse(boolean makeHeuristicDecission, long startTimestamp, long commitTimestamp, Channel c) {
+ private void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel c) {
TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
- if (makeHeuristicDecission) { // If the commit is ambiguous is due to a new master TSO
- commitBuilder.setMakeHeuristicDecision(true);
- }
commitBuilder.setAborted(false).setStartTimestamp(startTimestamp).setCommitTimestamp(commitTimestamp);
builder.setCommitResponse(commitBuilder.build());
c.write(builder.build());
@@ -246,13 +240,11 @@ public class ProgrammableTSOServer extends SimpleChannelHandler {
public static class CommitResponse extends Response {
- final boolean heuristicDecissionRequired;
final long startTS;
final long commitTS;
- public CommitResponse(boolean heuristicDecissionRequired, long startTS, long commitTS) {
+ public CommitResponse(long startTS, long commitTS) {
super(ResponseType.COMMIT);
- this.heuristicDecissionRequired = heuristicDecissionRequired;
this.startTS = startTS;
this.commitTS = commitTS;
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/55e6644f/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
index 4b012fe..0fe4a22 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
@@ -110,49 +110,14 @@ public class TestBatch {
}
// Test that sending replies empties the batch
- final boolean MASTER_INSTANCE = true;
- final boolean SHOULD_MAKE_HEURISTIC_DECISSION = true;
- batch.sendRepliesAndReset(replyProcessor, retryProcessor, MASTER_INSTANCE);
+ batch.sendRepliesAndReset(replyProcessor, retryProcessor);
verify(replyProcessor, timeout(100).times(BATCH_SIZE / 2))
.timestampResponse(anyLong(), any(Channel.class), any(MonitoringContext.class));
verify(replyProcessor, timeout(100).times(BATCH_SIZE / 2))
- .commitResponse(eq(!SHOULD_MAKE_HEURISTIC_DECISSION), anyLong(), anyLong(),
- any(Channel.class), any(MonitoringContext.class));
+ .commitResponse(anyLong(), anyLong(), any(Channel.class), any(MonitoringContext.class));
AssertJUnit.assertFalse("Batch shouldn't be full", batch.isFull());
AssertJUnit.assertEquals("Num events should be 0", 0, batch.getNumEvents());
}
- @Test
- public void testBatchFunctionalityWhenMastershipIsLost() {
- Channel channel = Mockito.mock(Channel.class);
-
- // Fill the batch with events till full
- for (int i = 0; i < BATCH_SIZE; i++) {
- if (i % 2 == 0) {
- MonitoringContext monCtx = new MonitoringContext(metrics);
- monCtx.timerStart("timestampPersistProcessor");
- batch.addTimestamp(i, channel, monCtx);
- } else {
- MonitoringContext monCtx = new MonitoringContext(metrics);
- monCtx.timerStart("commitPersistProcessor");
- batch.addCommit(i, i + 1, channel, monCtx);
- }
- }
-
- // Test that sending replies empties the batch also when the replica
- // is NOT master and calls the ambiguousCommitResponse() method on the
- // reply processor
- final boolean MASTER_INSTANCE = true;
- final boolean SHOULD_MAKE_HEURISTIC_DECISSION = true;
- batch.sendRepliesAndReset(replyProcessor, retryProcessor, !MASTER_INSTANCE);
- verify(replyProcessor, timeout(100).times(BATCH_SIZE / 2))
- .timestampResponse(anyLong(), any(Channel.class), any(MonitoringContext.class));
- verify(replyProcessor, timeout(100).times(BATCH_SIZE / 2))
- .commitResponse(eq(SHOULD_MAKE_HEURISTIC_DECISSION), anyLong(), anyLong(), any(Channel.class), any(
- MonitoringContext.class));
- assertFalse(batch.isFull(), "Batch shouldn't be full");
- assertEquals(batch.getNumEvents(), 0, "Num events should be 0");
-
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/55e6644f/tso-server/src/test/java/org/apache/omid/tso/TestLeaseManager.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestLeaseManager.java b/tso-server/src/test/java/org/apache/omid/tso/TestLeaseManager.java
index 60cefca..62c3435 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestLeaseManager.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestLeaseManager.java
@@ -40,6 +40,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
@@ -359,7 +360,7 @@ public class TestLeaseManager {
Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
ArgumentCaptor<IllegalArgumentException> trowableIAE = ArgumentCaptor.forClass(IllegalArgumentException.class);
- verify(panicker).panic(anyString(), trowableIAE.capture());
+ verify(panicker, times(2)).panic(anyString(), trowableIAE.capture());
assertTrue(trowableIAE.getValue() instanceof IllegalArgumentException);
assertTrue(trowableIAE.getValue().getMessage().contains("Incorrect TSO Info found"));
@@ -378,7 +379,7 @@ public class TestLeaseManager {
ArgumentCaptor<LeaseManagement.LeaseManagementException> trowableLME =
ArgumentCaptor.forClass(LeaseManagement.LeaseManagementException.class);
- verify(panicker).panic(anyString(), trowableLME.capture());
+ verify(panicker, times(2)).panic(anyString(), trowableLME.capture());
assertTrue(trowableLME.getValue() instanceof LeaseManagement.LeaseManagementException);
assertTrue(trowableLME.getValue().getMessage().contains("Another TSO replica was found"));
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/55e6644f/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
index 120d748..73d10dd 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
@@ -18,9 +18,11 @@
package org.apache.omid.tso;
import org.apache.omid.committable.CommitTable;
+import org.apache.omid.committable.InMemoryCommitTable;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.metrics.NullMetricsProvider;
import org.apache.omid.tso.PersistenceProcessorImpl.Batch;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
@@ -33,14 +35,17 @@ import java.io.IOException;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
@SuppressWarnings({"UnusedDeclaration"})
public class TestPersistenceProcessor {
@@ -100,6 +105,9 @@ public class TestPersistenceProcessor {
TSOServerConfig tsoServerConfig = new TSOServerConfig();
tsoServerConfig.setBatchPersistTimeoutInMs(100);
+
+ Batch batch = spy(new Batch(1));
+
// Component under test
PersistenceProcessor proc = new PersistenceProcessorImpl(tsoServerConfig,
metrics,
@@ -114,58 +122,118 @@ public class TestPersistenceProcessor {
// The non-ha lease manager always return true for
// stillInLeasePeriod(), so verify the batch sends replies as master
MonitoringContext monCtx = new MonitoringContext(metrics);
- proc.persistCommit(1, 2, null, monCtx);
+ proc.persistCommit(0, 1, null, monCtx);
verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
- verify(batch, timeout(1000).times(2)).sendRepliesAndReset(any(ReplyProcessor.class),
- any(RetryProcessor.class),
- eq(true));
+ verify(batch, timeout(1000).times(1)).sendRepliesAndReset(any(ReplyProcessor.class),
+ any(RetryProcessor.class));
}
@Test
public void testCommitPersistenceWithHALeaseManager() throws Exception {
- // Init a HA lease manager
- LeaseManager leaseManager = mock(LeaseManager.class);
-
TSOServerConfig tsoServerConfig = new TSOServerConfig();
tsoServerConfig.setBatchPersistTimeoutInMs(100);
+
+ MonitoringContext monCtx = new MonitoringContext(metrics);
+
+ // Test 1: Configure the lease manager to always return true for stillInLeasePeriod, so verify the batch sends
+ // replies as master
+
+ // Init stuff
+ ArgumentCaptor<String> msg = ArgumentCaptor.forClass(String.class);
+ Panicker panicker = spy(new RuntimeExceptionPanicker());
+ LeaseManager leaseManager = mock(LeaseManager.class);
+ Batch batch = spy(new Batch(1));
// Component under test
PersistenceProcessor proc = new PersistenceProcessorImpl(tsoServerConfig,
metrics,
"localhost:1234",
batch,
leaseManager,
- commitTable,
+ new InMemoryCommitTable(),
replyProcessor,
retryProcessor,
panicker);
- // Configure the lease manager to always return true for
- // stillInLeasePeriod, so verify the batch sends replies as master
doReturn(true).when(leaseManager).stillInLeasePeriod();
- MonitoringContext monCtx = new MonitoringContext(metrics);
proc.persistCommit(1, 2, null, monCtx);
verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
- verify(batch).sendRepliesAndReset(any(ReplyProcessor.class), any(RetryProcessor.class), eq(true));
+ verify(batch).sendRepliesAndReset(any(ReplyProcessor.class), any(RetryProcessor.class));
+ verify(panicker, never()).panic(msg.capture(), any(Throwable.class));
- // Configure the lease manager to always return true first and false
- // later for stillInLeasePeriod, so verify the batch sends replies as
- // non-master
+ // Test 2: Configure the lease manager to return true first and false later for stillInLeasePeriod, so verify
+ // the batch sends replies as non-master
+
+ // Reset stuff
reset(leaseManager);
- reset(batch);
+ batch = spy(new Batch(1));
+ msg = ArgumentCaptor.forClass(String.class);
+ panicker = spy(new RuntimeExceptionPanicker());
+ // Component under test
+ proc = new PersistenceProcessorImpl(tsoServerConfig,
+ metrics,
+ "localhost:1234",
+ batch,
+ leaseManager,
+ new InMemoryCommitTable(),
+ replyProcessor,
+ retryProcessor,
+ panicker);
doReturn(true).doReturn(false).when(leaseManager).stillInLeasePeriod();
proc.persistCommit(1, 2, null, monCtx);
verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
- verify(batch).sendRepliesAndReset(any(ReplyProcessor.class), any(RetryProcessor.class), eq(false));
+ verify(batch, never()).sendRepliesAndReset(any(ReplyProcessor.class), any(RetryProcessor.class));
+ verify(panicker).panic(msg.capture());
+ assertTrue(msg.getValue().contains("Committing suicide"));
+
+ // Test 3: Configure the lease manager to always return false for stillInLeasePeriod, so verify the batch
+ // sends replies as non-master
- // Configure the lease manager to always return false for
- // stillInLeasePeriod, so verify the batch sends replies as non-master
+ // Reset stuff
reset(leaseManager);
- reset(batch);
+ batch = spy(new Batch(1));
+ msg = ArgumentCaptor.forClass(String.class);
+ panicker = spy(new RuntimeExceptionPanicker());
+ // Component under test
+ proc = new PersistenceProcessorImpl(tsoServerConfig,
+ metrics,
+ "localhost:1234",
+ batch,
+ leaseManager,
+ commitTable,
+ replyProcessor,
+ retryProcessor,
+ panicker);
doReturn(false).when(leaseManager).stillInLeasePeriod();
proc.persistCommit(1, 2, null, monCtx);
verify(leaseManager, timeout(1000).times(1)).stillInLeasePeriod();
- verify(batch).sendRepliesAndReset(any(ReplyProcessor.class), any(RetryProcessor.class), eq(false));
+ verify(batch, never()).sendRepliesAndReset(any(ReplyProcessor.class), any(RetryProcessor.class));
+ verify(panicker).panic(msg.capture());
+ assertTrue(msg.getValue().contains("Committing suicide"));
+
+ // Test 4: Test when flushing data fails to the database
+
+ // Reset stuff
+ reset(leaseManager);
+ batch = spy(new Batch(1));
+ msg = ArgumentCaptor.forClass(String.class);
+ panicker = spy(new RuntimeExceptionPanicker());
+ // Component under test
+ proc = new PersistenceProcessorImpl(tsoServerConfig,
+ metrics,
+ "localhost:1234",
+ batch,
+ leaseManager,
+ commitTable, // Commit table writer will fail and launch a panic
+ replyProcessor,
+ retryProcessor,
+ panicker);
+ doReturn(true).when(leaseManager).stillInLeasePeriod();
+ proc.persistCommit(1, 2, null, monCtx);
+ verify(leaseManager, timeout(1000).times(1)).stillInLeasePeriod();
+ verify(batch, never()).sendRepliesAndReset(any(ReplyProcessor.class), any(RetryProcessor.class));
+ verify(panicker).panic(msg.capture(), isA(IOException.class));
+ assertEquals(msg.getValue(), "Error persisting commit batch");
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/55e6644f/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
index 5cc629e..d971e6c 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
@@ -88,7 +88,7 @@ public class TestRetryProcessor {
retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
ArgumentCaptor<Long> secondTScapture = ArgumentCaptor.forClass(Long.class);
verify(replyProc, timeout(100).times(1))
- .commitResponse(eq(false), firstTScapture.capture(), secondTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+ .commitResponse(firstTScapture.capture(), secondTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
startTS = firstTScapture.getValue();
long commitTS = secondTScapture.getValue();
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/55e6644f/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
index b80a0a3..2167fa7 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
@@ -357,7 +357,6 @@ public class TestTSOClientRequestAndResponseBehaviours {
clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
assertFalse(response.getCommitResponse().getAborted(), "Transaction should be committed");
- assertFalse(response.getCommitResponse().getMakeHeuristicDecision());
assertEquals(response.getCommitResponse().getCommitTimestamp(), tx1ST + 1);
}
@@ -374,7 +373,6 @@ public class TestTSOClientRequestAndResponseBehaviours {
clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
assertTrue(response.getCommitResponse().getAborted(), "Transaction should be aborted");
- assertFalse(response.getCommitResponse().getMakeHeuristicDecision());
assertEquals(response.getCommitResponse().getCommitTimestamp(), 0);
}
@@ -393,7 +391,6 @@ public class TestTSOClientRequestAndResponseBehaviours {
TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
assertTrue(response.getCommitResponse().getAborted(), "Transaction should abort");
- assertFalse(response.getCommitResponse().getMakeHeuristicDecision());
assertEquals(response.getCommitResponse().getCommitTimestamp(), 0);
}
// ----------------------------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/55e6644f/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientResponseHandling.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientResponseHandling.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientResponseHandling.java
index d485772..2f6463d 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientResponseHandling.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientResponseHandling.java
@@ -87,25 +87,10 @@ public class TestTSOClientResponseHandling {
// side returns a commit timestamp
// Program the TSO to return an Commit response (with no required heuristic actions)
- tsoServer.queueResponse(new CommitResponse(false, START_TS, COMMIT_TS));
+ tsoServer.queueResponse(new CommitResponse(START_TS, COMMIT_TS));
long commitTS = tsoClient.commit(START_TS, Collections.<CellId>emptySet()).get();
assertEquals(commitTS, COMMIT_TS);
}
- @Test
- public void testCommitRequestReceivingAHeuristicResponse() throws Exception {
- // test commit request which needs heuristic actions from the client
- // throws an execution exception with a NewTSOException as a cause
-
- // Program the TSO to return an Commit response requiring heuristic actions
- tsoServer.queueResponse(new CommitResponse(true, START_TS, COMMIT_TS));
- try {
- tsoClient.commit(START_TS, Collections.<CellId>emptySet()).get();
- } catch (ExecutionException ee) {
- assertEquals(ee.getCause().getClass(), NewTSOException.class);
- }
-
- }
-
}