You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by oh...@apache.org on 2017/08/03 14:16:18 UTC

incubator-omid git commit: [OMID-73] A checkpoint is a point in a transaction where every write occurs after the checkpoint may not be visible by the transaction. The visibility is defined by the following visibility levels: SNAPSHOT - returns the last w

Repository: incubator-omid
Updated Branches:
  refs/heads/master 0cae2ff18 -> 1f83aeda9


[OMID-73] A checkpoint is a point in a transaction where every write occurs after the checkpoint may not be visible by the transaction. The visibility is defined by the following visibility levels: SNAPSHOT - returns the last written key by the transaction, in case one exists, otherwise, returns the key from the transaction snapshot. SNAPSHOT_EXCLUDE_CURRENT - returns the last written key by the transaction that was written before the last checkpoint, in case one exists, otherwise, returns the key from the transaction snapshot. SNAPSHOT_ALL - returns all the versions written by the transaction including the one in the transaction snapshot. This feature is needed for Phoenix both for upsert operations and for correct rollback of the secondary index on transaction abort. Explanations for this feature can be seen in [TEPHRA-96].

Signed-off-by: Ohad Shacham <oh...@yahoo-inc.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/1f83aeda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/1f83aeda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/1f83aeda

Branch: refs/heads/master
Commit: 1f83aeda963472c861b4b622c08b3c8c725fbd16
Parents: 0cae2ff
Author: Ohad Shacham <oh...@yahoo-inc.com>
Authored: Thu Aug 3 16:45:59 2017 +0300
Committer: Ohad Shacham <oh...@yahoo-inc.com>
Committed: Thu Aug 3 16:50:57 2017 +0300

----------------------------------------------------------------------
 .../transaction/HBaseSyncPostCommitter.java     |   2 +-
 .../omid/transaction/HBaseTransaction.java      |   3 +-
 .../transaction/HBaseTransactionManager.java    |   4 +-
 .../org/apache/omid/transaction/TTable.java     |  62 ++--
 .../apache/omid/transaction/TestCheckpoint.java | 320 +++++++++++++++++++
 .../transaction/TestHBaseTransactionClient.java |   4 +-
 .../TestHBaseTransactionManager.java            |   2 +-
 .../omid/transaction/AbstractTransaction.java   |  78 ++++-
 .../transaction/AbstractTransactionManager.java |   2 +
 tso-server/pom.xml                              |   1 -
 .../apache/omid/tso/TimestampOracleImpl.java    |  10 +-
 .../apache/omid/tso/WorldClockOracleImpl.java   |   5 +-
 .../default-omid-server-configuration.yml       |   1 -
 .../apache/omid/tso/TestRequestProcessor.java   |  20 +-
 .../apache/omid/tso/TestTimestampOracle.java    |   5 +-
 .../apache/omid/tso/TestWorldTimeOracle.java    |   2 +
 ...tionOfTSOClientServerBasicFunctionality.java |  14 +-
 .../client/TestTSOClientConnectionToTSO.java    |   8 +-
 ...stTSOClientRequestAndResponseBehaviours.java |   4 +-
 19 files changed, 494 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1f83aeda/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
index 952d067..06e5c89 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
@@ -66,7 +66,7 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
                 Put put = new Put(cell.getRow());
                 put.add(cell.getFamily(),
                         CellUtils.addShadowCellSuffix(cell.getQualifier(), 0, cell.getQualifier().length),
-                        tx.getStartTimestamp(),
+                        cell.getTimestamp(),
                         Bytes.toBytes(tx.getCommitTimestamp()));
                 try {
                     cell.getTable().put(put);

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1f83aeda/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransaction.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransaction.java
index aff08e1..3ca24ed 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransaction.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransaction.java
@@ -19,6 +19,7 @@ package org.apache.omid.transaction;
 
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.omid.transaction.AbstractTransaction.VisibilityLevel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,7 +40,7 @@ public class HBaseTransaction extends AbstractTransaction<HBaseCellId> {
         Set<HBaseCellId> writeSet = getWriteSet();
         for (final HBaseCellId cell : writeSet) {
             Delete delete = new Delete(cell.getRow());
-            delete.deleteColumn(cell.getFamily(), cell.getQualifier(), getStartTimestamp());
+            delete.deleteColumn(cell.getFamily(), cell.getQualifier(), cell.getTimestamp());
             try {
                 cell.getTable().delete(delete);
             } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1f83aeda/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
index f11ef03..484eb19 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
@@ -20,7 +20,6 @@ package org.apache.omid.transaction;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
-import com.google.common.hash.Hashing;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -209,8 +208,9 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
     @Override
     public boolean isCommitted(HBaseCellId hBaseCellId) throws TransactionException {
         try {
+            long timestamp = hBaseCellId.getTimestamp() - (hBaseCellId.getTimestamp() % AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN);
             CommitTimestamp tentativeCommitTimestamp =
-                    locateCellCommitTimestamp(hBaseCellId.getTimestamp(), tsoClient.getEpoch(),
+                    locateCellCommitTimestamp(timestamp, tsoClient.getEpoch(),
                                               new CommitTimestampLocatorImpl(hBaseCellId, Maps.<Long, Long>newHashMap()));
 
             // If transaction that added the cell was invalidated

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1f83aeda/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
index 9b82148..44a1a82 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.omid.committable.CommitTable.CommitTimestamp;
+import org.apache.omid.transaction.AbstractTransaction.VisibilityLevel;
 import org.apache.omid.transaction.HBaseTransactionManager.CommitTimestampLocatorImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -132,7 +133,7 @@ public class TTable implements Closeable {
 
         HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
 
-        final long readTimestamp = transaction.getStartTimestamp();
+        final long readTimestamp = transaction.getReadTimestamp();
         final Get tsget = new Get(get.getRow()).setFilter(get.getFilter());
         TimeRange timeRange = get.getTimeRange();
         long startTime = timeRange.getMin();
@@ -177,10 +178,10 @@ public class TTable implements Closeable {
 
         HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
 
-        final long startTimestamp = transaction.getStartTimestamp();
+        final long writeTimestamp = transaction.getWriteTimestamp();
         boolean issueGet = false;
 
-        final Put deleteP = new Put(delete.getRow(), startTimestamp);
+        final Put deleteP = new Put(delete.getRow(), writeTimestamp);
         final Get deleteG = new Get(delete.getRow());
         Map<byte[], List<Cell>> fmap = delete.getFamilyCellMap();
         if (fmap.isEmpty()) {
@@ -188,19 +189,19 @@ public class TTable implements Closeable {
         }
         for (List<Cell> cells : fmap.values()) {
             for (Cell cell : cells) {
-                CellUtils.validateCell(cell, startTimestamp);
+                CellUtils.validateCell(cell, writeTimestamp);
                 switch (KeyValue.Type.codeToType(cell.getTypeByte())) {
                     case DeleteColumn:
                         deleteP.add(CellUtil.cloneFamily(cell),
                                     CellUtil.cloneQualifier(cell),
-                                    startTimestamp,
+                                    writeTimestamp,
                                     CellUtils.DELETE_TOMBSTONE);
                         transaction.addWriteSetElement(
                             new HBaseCellId(table,
                                             delete.getRow(),
                                             CellUtil.cloneFamily(cell),
                                             CellUtil.cloneQualifier(cell),
-                                            cell.getTimestamp()));
+                                            writeTimestamp));
                         break;
                     case DeleteFamily:
                         deleteG.addFamily(CellUtil.cloneFamily(cell));
@@ -210,14 +211,14 @@ public class TTable implements Closeable {
                         if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
                             deleteP.add(CellUtil.cloneFamily(cell),
                                         CellUtil.cloneQualifier(cell),
-                                        startTimestamp,
+                                        writeTimestamp,
                                         CellUtils.DELETE_TOMBSTONE);
                             transaction.addWriteSetElement(
                                 new HBaseCellId(table,
                                                 delete.getRow(),
                                                 CellUtil.cloneFamily(cell),
                                                 CellUtil.cloneQualifier(cell),
-                                                cell.getTimestamp()));
+                                                writeTimestamp));
                             break;
                         } else {
                             throw new UnsupportedOperationException(
@@ -240,7 +241,7 @@ public class TTable implements Closeable {
                         byte[] qualifier = entryQ.getKey();
                         deleteP.add(family, qualifier, CellUtils.DELETE_TOMBSTONE);
                         transaction.addWriteSetElement(new HBaseCellId(table, delete.getRow(), family, qualifier,
-                                                                       transaction.getStartTimestamp()));
+                                writeTimestamp));
                     }
                 }
             }
@@ -265,18 +266,19 @@ public class TTable implements Closeable {
 
         HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
 
-        final long startTimestamp = transaction.getStartTimestamp();
+        final long writeTimestamp = transaction.getWriteTimestamp();
+
         // create put with correct ts
-        final Put tsput = new Put(put.getRow(), startTimestamp);
+        final Put tsput = new Put(put.getRow(), writeTimestamp);
         Map<byte[], List<Cell>> kvs = put.getFamilyCellMap();
         for (List<Cell> kvl : kvs.values()) {
             for (Cell c : kvl) {
-                CellUtils.validateCell(c, startTimestamp);
+                CellUtils.validateCell(c, writeTimestamp);
                 // Reach into keyvalue to update timestamp.
                 // It's not nice to reach into keyvalue internals,
                 // but we want to avoid having to copy the whole thing
                 KeyValue kv = KeyValueUtil.ensureKeyValue(c);
-                Bytes.putLong(kv.getValueArray(), kv.getTimestampOffset(), startTimestamp);
+                Bytes.putLong(kv.getValueArray(), kv.getTimestampOffset(), writeTimestamp);
                 tsput.add(kv);
 
                 transaction.addWriteSetElement(
@@ -307,7 +309,7 @@ public class TTable implements Closeable {
 
         Scan tsscan = new Scan(scan);
         tsscan.setMaxVersions(1);
-        tsscan.setTimeRange(0, transaction.getStartTimestamp() + 1);
+        tsscan.setTimeRange(0, transaction.getReadTimestamp() + 1);
         Map<byte[], NavigableSet<byte[]>> kvs = scan.getFamilyMap();
         for (Map.Entry<byte[], NavigableSet<byte[]>> entry : kvs.entrySet()) {
             byte[] family = entry.getKey();
@@ -351,12 +353,20 @@ public class TTable implements Closeable {
             boolean snapshotValueFound = false;
             Cell oldestCell = null;
             for (Cell cell : columnCells) {
-                if (isCellInSnapshot(cell, transaction, commitCache)) {
+                if (isCellInTransaction(cell, transaction, commitCache) ||
+                    isCellInSnapshot(cell, transaction, commitCache)) {
                     if (!CellUtil.matchingValue(cell, CellUtils.DELETE_TOMBSTONE)) {
                         keyValuesInSnapshot.add(cell);
                     }
-                    snapshotValueFound = true;
-                    break;
+
+                    // We can finish looking for additional results in two cases:
+                    // 1. if we found a result and we are not in SNAPSHOT_ALL mode.
+                    // 2. if we found a result that was not written by the current transaction.
+                    if (transaction.getVisibilityLevel() != VisibilityLevel.SNAPSHOT_ALL ||
+                        !isCellInTransaction(cell, transaction, commitCache)) {
+                        snapshotValueFound = true;
+                        break;
+                    }
                 }
                 oldestCell = cell;
             }
@@ -379,7 +389,6 @@ public class TTable implements Closeable {
 
         Collections.sort(keyValuesInSnapshot, KeyValue.COMPARATOR);
 
-        assert (keyValuesInSnapshot.size() <= rawCells.size());
         return keyValuesInSnapshot;
     }
 
@@ -396,15 +405,26 @@ public class TTable implements Closeable {
         return commitCache;
     }
 
-    private boolean isCellInSnapshot(Cell kv, HBaseTransaction transaction, Map<Long, Long> commitCache)
-        throws IOException {
+    private boolean isCellInTransaction(Cell kv, HBaseTransaction transaction, Map<Long, Long> commitCache) {
 
         long startTimestamp = transaction.getStartTimestamp();
+        long readTimestamp = transaction.getReadTimestamp();
 
-        if (kv.getTimestamp() == startTimestamp) {
+        // A cell was written by a transaction if its timestamp is larger than its startTimestamp and smaller or equal to its readTimestamp.
+        // There also might be a case where the cell was written by the transaction and its timestamp equals to its writeTimestamp, however,
+        // this case occurs after checkpoint and in this case we do not want to read this data.
+        if (kv.getTimestamp() >= startTimestamp && kv.getTimestamp() <= readTimestamp) {
             return true;
         }
 
+        return false;
+    }
+
+    private boolean isCellInSnapshot(Cell kv, HBaseTransaction transaction, Map<Long, Long> commitCache)
+        throws IOException {
+
+        long startTimestamp = transaction.getStartTimestamp();
+
         Optional<Long> commitTimestamp =
             tryToLocateCellCommitTimestamp(transaction.getTransactionManager(), transaction.getEpoch(), kv,
                                            commitCache);

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1f83aeda/hbase-client/src/test/java/org/apache/omid/transaction/TestCheckpoint.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestCheckpoint.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestCheckpoint.java
new file mode 100644
index 0000000..078ea5f
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestCheckpoint.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.omid.transaction.AbstractTransaction.VisibilityLevel;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.ITestContext;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertTrue;
+
+@Test(groups = "sharedHBase")
+public class TestCheckpoint extends OmidTestBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestCheckpoint.class);
+
+    private HBaseTransaction enforceHBaseTransactionAsParam(Transaction tx) {
+        if (tx instanceof HBaseTransaction) {
+            return (HBaseTransaction) tx;
+        } else {
+            throw new IllegalArgumentException(
+                String.format("The transaction object passed %s is not an instance of HBaseTransaction",
+                              tx.getClass().getName()));
+        }
+    }
+
+    @Test(timeOut = 30_000)
+    public void testFewCheckPoints(ITestContext context) throws Exception {
+
+        TransactionManager tm = newTransactionManager(context);
+        TTable tt = new TTable(hbaseConf, TEST_TABLE);
+
+        byte[] rowName1 = Bytes.toBytes("row1");
+        byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
+        byte[] colName1 = Bytes.toBytes("col1");
+        byte[] dataValue1 = Bytes.toBytes("testWrite-1");
+        byte[] dataValue2 = Bytes.toBytes("testWrite-2");
+        byte[] dataValue3 = Bytes.toBytes("testWrite-3");
+
+        Transaction tx1 = tm.begin();
+
+        HBaseTransaction hbaseTx1 = enforceHBaseTransactionAsParam(tx1);
+
+        Put row1 = new Put(rowName1);
+        row1.add(famName1, colName1, dataValue1);
+        tt.put(tx1, row1);
+
+        Get g = new Get(rowName1).setMaxVersions(1);
+
+        Result r = tt.get(tx1, g);
+        assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
+                "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
+
+        hbaseTx1.checkpoint();
+
+        row1 = new Put(rowName1);
+        row1.add(famName1, colName1, dataValue2);
+        tt.put(tx1, row1);
+
+        r = tt.get(tx1, g);
+        assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
+                "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
+
+        hbaseTx1.setVisibilityLevel(VisibilityLevel.SNAPSHOT);
+
+        r = tt.get(tx1, g);
+        assertTrue(Bytes.equals(dataValue2, r.getValue(famName1, colName1)),
+                "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
+
+        hbaseTx1.checkpoint();
+
+        row1 = new Put(rowName1);
+        row1.add(famName1, colName1, dataValue3);
+        tt.put(tx1, row1);
+
+        r = tt.get(tx1, g);
+        assertTrue(Bytes.equals(dataValue2, r.getValue(famName1, colName1)),
+                "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
+
+        hbaseTx1.checkpoint();
+
+        r = tt.get(tx1, g);
+        assertTrue(Bytes.equals(dataValue3, r.getValue(famName1, colName1)),
+                "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
+
+        hbaseTx1.setVisibilityLevel(VisibilityLevel.SNAPSHOT_ALL);
+
+        r = tt.get(tx1, g);
+        
+        assertTrue(r.size() == 3, "Expected 3 results and found " + r.size());
+
+        List<Cell> cells = r.getColumnCells(famName1, colName1);
+        assertTrue(Bytes.equals(dataValue3, cells.get(0).getValue()),
+                "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
+
+        assertTrue(Bytes.equals(dataValue2, cells.get(1).getValue()),
+              "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
+
+        assertTrue(Bytes.equals(dataValue1, cells.get(2).getValue()),
+                "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
+
+        tt.close();
+    }
+
+    @Test(timeOut = 30_000)
+    public void testSNAPSHOT(ITestContext context) throws Exception {
+        TransactionManager tm = newTransactionManager(context);
+        TTable tt = new TTable(hbaseConf, TEST_TABLE);
+
+        byte[] rowName1 = Bytes.toBytes("row1");
+        byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
+        byte[] colName1 = Bytes.toBytes("col1");
+        byte[] dataValue0 = Bytes.toBytes("testWrite-0");
+        byte[] dataValue1 = Bytes.toBytes("testWrite-1");
+        byte[] dataValue2 = Bytes.toBytes("testWrite-2");
+
+        Transaction tx1 = tm.begin();
+
+        Put row1 = new Put(rowName1);
+        row1.add(famName1, colName1, dataValue0);
+        tt.put(tx1, row1);
+
+        tm.commit(tx1);
+
+        tx1 = tm.begin();
+
+        HBaseTransaction hbaseTx1 = enforceHBaseTransactionAsParam(tx1);
+
+        Get g = new Get(rowName1).setMaxVersions(1);
+
+        Result r = tt.get(tx1, g);
+        assertTrue(Bytes.equals(dataValue0, r.getValue(famName1, colName1)),
+                "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
+
+        row1 = new Put(rowName1);
+        row1.add(famName1, colName1, dataValue1);
+        tt.put(tx1, row1);
+
+
+        r = tt.get(tx1, g);
+        assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
+                "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
+
+        hbaseTx1.checkpoint();
+
+        row1 = new Put(rowName1);
+        row1.add(famName1, colName1, dataValue2);
+        tt.put(tx1, row1);
+
+        r = tt.get(tx1, g);
+        assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
+                "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
+
+        hbaseTx1.setVisibilityLevel(VisibilityLevel.SNAPSHOT);
+
+        r = tt.get(tx1, g);
+        assertTrue(Bytes.equals(dataValue2, r.getValue(famName1, colName1)),
+                "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
+
+        tt.close();
+    }
+    
+    @Test(timeOut = 30_000)
+    public void testSNAPSHOT_ALL(ITestContext context) throws Exception {
+        TransactionManager tm = newTransactionManager(context);
+        TTable tt = new TTable(hbaseConf, TEST_TABLE);
+
+        byte[] rowName1 = Bytes.toBytes("row1");
+        byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
+        byte[] colName1 = Bytes.toBytes("col1");
+        byte[] dataValue0 = Bytes.toBytes("testWrite-0");
+        byte[] dataValue1 = Bytes.toBytes("testWrite-1");
+        byte[] dataValue2 = Bytes.toBytes("testWrite-2");
+
+        Transaction tx1 = tm.begin();
+
+        Put row1 = new Put(rowName1);
+        row1.add(famName1, colName1, dataValue0);
+        tt.put(tx1, row1);
+
+        tm.commit(tx1);
+
+        tx1 = tm.begin();
+        
+        HBaseTransaction hbaseTx1 = enforceHBaseTransactionAsParam(tx1);
+
+        Get g = new Get(rowName1).setMaxVersions(100);
+
+        Result r = tt.get(tx1, g);
+        assertTrue(Bytes.equals(dataValue0, r.getValue(famName1, colName1)),
+                "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
+
+        row1 = new Put(rowName1);
+        row1.add(famName1, colName1, dataValue1);
+        tt.put(tx1, row1);
+
+        g = new Get(rowName1).setMaxVersions(100);
+
+        r = tt.get(tx1, g);
+        assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
+                "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
+
+        hbaseTx1.checkpoint();
+
+        row1 = new Put(rowName1);
+        row1.add(famName1, colName1, dataValue2);
+        tt.put(tx1, row1);
+
+        r = tt.get(tx1, g);
+        assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
+                "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
+
+        hbaseTx1.setVisibilityLevel(VisibilityLevel.SNAPSHOT_ALL);
+
+        r = tt.get(tx1, g);
+        
+        assertTrue(r.size() == 3, "Expected 3 results and found " + r.size());
+
+        List<Cell> cells = r.getColumnCells(famName1, colName1);
+        assertTrue(Bytes.equals(dataValue2, cells.get(0).getValue()),
+                "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
+
+        assertTrue(Bytes.equals(dataValue1, cells.get(1).getValue()),
+              "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
+
+        assertTrue(Bytes.equals(dataValue0, cells.get(2).getValue()),
+                "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
+
+        tt.close();
+    }
+
+    @Test(timeOut = 30_000)
+    public void testSNAPSHOT_EXCLUDE_CURRENT(ITestContext context) throws Exception {
+        TransactionManager tm = newTransactionManager(context);
+        TTable tt = new TTable(hbaseConf, TEST_TABLE);
+
+        byte[] rowName1 = Bytes.toBytes("row1");
+        byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
+        byte[] colName1 = Bytes.toBytes("col1");
+        byte[] dataValue1 = Bytes.toBytes("testWrite-1");
+        byte[] dataValue2 = Bytes.toBytes("testWrite-2");
+
+        Transaction tx1 = tm.begin();
+
+        HBaseTransaction hbaseTx1 = enforceHBaseTransactionAsParam(tx1);
+
+        Put row1 = new Put(rowName1);
+        row1.add(famName1, colName1, dataValue1);
+        tt.put(tx1, row1);
+
+        Get g = new Get(rowName1).setMaxVersions(1);
+
+        Result r = tt.get(tx1, g);
+        assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
+                "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
+
+        hbaseTx1.checkpoint();
+
+        row1 = new Put(rowName1);
+        row1.add(famName1, colName1, dataValue2);
+        tt.put(tx1, row1);
+
+        r = tt.get(tx1, g);
+        assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
+                "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
+
+        hbaseTx1.setVisibilityLevel(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+
+        r = tt.get(tx1, g);
+        assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
+                "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
+        
+        tt.close();
+    }
+
+    @Test(timeOut = 30_000)
+    public void testOutOfCheckpoints(ITestContext context) throws Exception {
+        TransactionManager tm = newTransactionManager(context);
+
+        Transaction tx1 = tm.begin();
+
+        HBaseTransaction hbaseTx1 = enforceHBaseTransactionAsParam(tx1);
+
+        for (int i=0; i < AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN - 1; ++i) {
+            hbaseTx1.checkpoint();
+        }
+
+        try {
+            hbaseTx1.checkpoint();
+            Assert.fail();
+        } catch (TransactionException e) {
+            // expected
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1f83aeda/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java
index c349657..8efdaa6 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java
@@ -268,7 +268,7 @@ public class TestHBaseTransactionClient extends OmidTestBase {
             CommitTimestamp ct = tm.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
                     ctLocator);
             assertTrue(ct.isValid());
-            long expectedCommitTS = tx1.getStartTimestamp() + 1;
+            long expectedCommitTS = tx1.getStartTimestamp() + AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
             assertEquals(ct.getValue(), expectedCommitTS);
             assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0);
         }
@@ -308,7 +308,7 @@ public class TestHBaseTransactionClient extends OmidTestBase {
     @Test(timeOut = 30_000)
     public void testCellFromTransactionInPreviousEpochGetsInvalidComitTimestamp(ITestContext context) throws Exception {
 
-        final long CURRENT_EPOCH_FAKE = 1000L;
+        final long CURRENT_EPOCH_FAKE = 1000L * AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
 
         CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
         AbstractTransactionManager tm = spy((AbstractTransactionManager) newTransactionManager(context, commitTableClient));

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1f83aeda/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionManager.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionManager.java
index 03187ea..347c4ce 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionManager.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionManager.java
@@ -52,7 +52,7 @@ public class TestHBaseTransactionManager extends OmidTestBase {
 
         TSOClient tsoClient = spy(getClient(context));
 
-        long fakeEpoch = tsoClient.getNewStartTimestamp().get() + FAKE_EPOCH_INCREMENT;
+        long fakeEpoch = tsoClient.getNewStartTimestamp().get() + (FAKE_EPOCH_INCREMENT * AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN);
 
         // Modify the epoch before testing the begin method
         doReturn(fakeEpoch).when(tsoClient).getEpoch();

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1f83aeda/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransaction.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransaction.java b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransaction.java
index 500c1e2..2ffbecd 100644
--- a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransaction.java
+++ b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransaction.java
@@ -18,6 +18,7 @@
 package org.apache.omid.transaction;
 
 import com.google.common.base.Optional;
+
 import org.apache.omid.tso.client.CellId;
 
 import java.util.ArrayList;
@@ -37,14 +38,28 @@ import java.util.Set;
  */
 public abstract class AbstractTransaction<T extends CellId> implements Transaction {
 
+    enum VisibilityLevel {
+        // Regular snapshot isolation. Returns the last key, either from the snapshot or from the current transaction
+        // Sets the readTimestamp to be the writeTimestamp
+        SNAPSHOT,
+        // Returns all the written version of a key X that written by the transaction and the key X from the provided snapshot.
+        SNAPSHOT_ALL,
+        // Returns the last key, either from the snapshot or from the current transaction that was written before the last checkpoint.
+        // Sets the readTimestamp to be the writeTimestamp - 1
+        SNAPSHOT_EXCLUDE_CURRENT
+    }
+
     private transient Map<String, Object> metadata = new HashMap<>();
     private final AbstractTransactionManager transactionManager;
     private final long startTimestamp;
+    protected long readTimestamp;
+    protected long writeTimestamp;
     private final long epoch;
     private long commitTimestamp;
     private boolean isRollbackOnly;
     private final Set<T> writeSet;
     private Status status = Status.RUNNING;
+    private VisibilityLevel visibilityLevel;
 
     /**
      * Base constructor
@@ -66,10 +81,27 @@ public abstract class AbstractTransaction<T extends CellId> implements Transacti
                                long epoch,
                                Set<T> writeSet,
                                AbstractTransactionManager transactionManager) {
-        this.startTimestamp = transactionId;
+        this.startTimestamp = this.readTimestamp = this.writeTimestamp = transactionId;
         this.epoch = epoch;
         this.writeSet = writeSet;
         this.transactionManager = transactionManager;
+        visibilityLevel = VisibilityLevel.SNAPSHOT;
+    }
+
+    /**
+     * Creates a checkpoint and sets the visibility level to SNAPSHOT_EXCLUDE_CURRENT
+     * The number of checkpoints is bounded to NUM_CHECKPOINTS in order to make checkpoint a client side operation
+     * @return true if a checkpoint was created and false otherwise
+     * @throws TransactionException
+     */
+    void checkpoint() throws TransactionException {
+
+        setVisibilityLevel(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+        this.readTimestamp = this.writeTimestamp++;
+
+        if (this.writeTimestamp % AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN == 0) {
+            throw new TransactionException("Error: number of checkpoing cannot exceed " + (AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN - 1));
+        }
     }
 
     /**
@@ -134,6 +166,22 @@ public abstract class AbstractTransaction<T extends CellId> implements Transacti
     }
 
     /**
+     * Returns the read timestamp for this transaction.
+     * @return read timestamp
+     */
+    public long getReadTimestamp() {
+        return readTimestamp;
+    }
+
+    /**
+     * Returns the write timestamp for this transaction.
+     * @return write timestamp
+     */
+    public long getWriteTimestamp() {
+        return writeTimestamp;
+    }
+
+    /**
      * Returns the commit timestamp for this transaction.
      * @return commit timestamp
      */
@@ -142,6 +190,14 @@ public abstract class AbstractTransaction<T extends CellId> implements Transacti
     }
 
     /**
+     * Returns the visibility level for this transaction.
+     * @return visibility level
+     */
+    public VisibilityLevel getVisibilityLevel() {
+        return visibilityLevel;
+    }
+
+    /**
      * Sets the commit timestamp for this transaction.
      * @param commitTimestamp
      *            the commit timestamp to set
@@ -151,6 +207,22 @@ public abstract class AbstractTransaction<T extends CellId> implements Transacti
     }
 
     /**
+     * Sets the visibility level for this transaction.
+     * @param visibilityLevel
+     *            the {@link VisibilityLevel} to set
+     */
+    public void setVisibilityLevel(VisibilityLevel visibilityLevel) {
+        this.visibilityLevel = visibilityLevel;
+
+        // If we are setting visibility level to either SNAPSHOT or SNAPSHOT_ALL
+        // then we should let readTimestamp equals to writeTimestamp
+        if (this.visibilityLevel == VisibilityLevel.SNAPSHOT ||
+            this.visibilityLevel == VisibilityLevel.SNAPSHOT_ALL) {
+            this.readTimestamp = this.writeTimestamp;
+        }
+    }
+
+    /**
      * Sets the status for this transaction.
      * @param status
      *            the {@link Status} to set
@@ -178,10 +250,12 @@ public abstract class AbstractTransaction<T extends CellId> implements Transacti
 
     @Override
     public String toString() {
-        return String.format("Tx-%s [%s] (ST=%d, CT=%d, Epoch=%d) WriteSet %s",
+        return String.format("Tx-%s [%s] (ST=%d, RT=%d, WT=%d, CT=%d, Epoch=%d) WriteSet %s",
                              Long.toHexString(getTransactionId()),
                              status,
                              startTimestamp,
+                             readTimestamp,
+                             writeTimestamp,
                              commitTimestamp,
                              epoch,
                              writeSet);

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1f83aeda/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
index 0ff0b43..77ccb20 100644
--- a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
+++ b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
@@ -58,6 +58,8 @@ public abstract class AbstractTransactionManager implements TransactionManager {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionManager.class);
 
+    public final static int MAX_CHECKPOINTS_PER_TXN = 50;
+
     public interface TransactionFactory<T extends CellId> {
 
         AbstractTransaction<T> createTransaction(long transactionId, long epoch, AbstractTransactionManager tm);

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1f83aeda/tso-server/pom.xml
----------------------------------------------------------------------
diff --git a/tso-server/pom.xml b/tso-server/pom.xml
index 88ec145..e1488c1 100644
--- a/tso-server/pom.xml
+++ b/tso-server/pom.xml
@@ -58,7 +58,6 @@
             <groupId>org.apache.omid</groupId>
             <artifactId>omid-transaction-client</artifactId>
             <version>${project.version}</version>
-            <scope>test</scope>
         </dependency>
 
         <!-- End of Dependencies on Omid modules -->

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1f83aeda/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java b/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
index 0a65c01..454526f 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
@@ -19,14 +19,17 @@ package org.apache.omid.tso;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.omid.metrics.Gauge;
 import org.apache.omid.metrics.MetricsRegistry;
 import org.apache.omid.timestamp.storage.TimestampStorage;
+import org.apache.omid.transaction.AbstractTransactionManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
 import javax.inject.Singleton;
+
 import java.io.IOException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
@@ -134,9 +137,12 @@ public class TimestampOracleImpl implements TimestampOracle {
     @SuppressWarnings("StatementWithEmptyBody")
     @Override
     public long next() {
-        lastTimestamp++;
+        lastTimestamp += AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
 
-        if (lastTimestamp == nextAllocationThreshold) {
+        if (lastTimestamp >= nextAllocationThreshold) {
+            // set the nextAllocationThread to max value of long in order to
+            // make sure only one call to this function will execute a thread to extend the timestamp batch.
+            nextAllocationThreshold = Long.MAX_VALUE; 
             executor.execute(allocateTimestampsBatchTask);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1f83aeda/tso-server/src/main/java/org/apache/omid/tso/WorldClockOracleImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/WorldClockOracleImpl.java b/tso-server/src/main/java/org/apache/omid/tso/WorldClockOracleImpl.java
index 454da7e..4a9c5b5 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/WorldClockOracleImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/WorldClockOracleImpl.java
@@ -23,6 +23,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.omid.metrics.Gauge;
 import org.apache.omid.metrics.MetricsRegistry;
 import org.apache.omid.timestamp.storage.TimestampStorage;
+import org.apache.omid.transaction.AbstractTransactionManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -130,8 +131,10 @@ public class WorldClockOracleImpl implements TimestampOracle {
 
         long currentMsFirstTimestamp = System.currentTimeMillis() * MAX_TX_PER_MS;
 
+        lastTimestamp += AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
+
         // Return the next timestamp in case we are still in the same millisecond as the previous timestamp was. 
-        if (++lastTimestamp >= currentMsFirstTimestamp) {
+        if (lastTimestamp >= currentMsFirstTimestamp) {
             return lastTimestamp;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1f83aeda/tso-server/src/main/resources/default-omid-server-configuration.yml
----------------------------------------------------------------------
diff --git a/tso-server/src/main/resources/default-omid-server-configuration.yml b/tso-server/src/main/resources/default-omid-server-configuration.yml
index 9c94488..4e45122 100644
--- a/tso-server/src/main/resources/default-omid-server-configuration.yml
+++ b/tso-server/src/main/resources/default-omid-server-configuration.yml
@@ -30,7 +30,6 @@ batchPersistTimeoutInMs: 10
 # INCREMENTAL - [Default] regular counter
 # WORLD_TIME - world time based counter
 timestampType: INCREMENTAL
-
 # Default module configuration (No TSO High Availability & in-memory storage for timestamp and commit tables)
 timestampStoreModule: !!org.apache.omid.tso.InMemoryTimestampStorageModule [ ]
 commitTableStoreModule: !!org.apache.omid.tso.InMemoryCommitTableStorageModule [ ]

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1f83aeda/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
index 4e32235..1c44d05 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
@@ -22,6 +22,7 @@ import com.google.common.util.concurrent.SettableFuture;
 
 import org.apache.omid.metrics.MetricsRegistry;
 import org.apache.omid.metrics.NullMetricsProvider;
+import org.apache.omid.transaction.AbstractTransactionManager;
 import org.jboss.netty.channel.Channel;
 import org.mockito.ArgumentCaptor;
 import org.slf4j.Logger;
@@ -64,6 +65,9 @@ public class TestRequestProcessor {
         // Build the required scaffolding for the test
         MetricsRegistry metrics = new NullMetricsProvider();
 
+        TSOServerConfig config = new TSOServerConfig();
+        config.setConflictMapSize(CONFLICT_MAP_SIZE);
+
         TimestampOracleImpl timestampOracle =
                 new TimestampOracleImpl(metrics, new TimestampOracleImpl.InMemoryTimestampStorage(), new MockPanicker());
 
@@ -74,9 +78,6 @@ public class TestRequestProcessor {
         f.set(null);
         doReturn(f).when(persist).persistLowWatermark(any(Long.class));
 
-        TSOServerConfig config = new TSOServerConfig();
-        config.setConflictMapSize(CONFLICT_MAP_SIZE);
-
         requestProc = new RequestProcessorImpl(metrics, timestampOracle, persist, new MockPanicker(), config);
 
         // Initialize the state for the experiment
@@ -97,7 +98,8 @@ public class TestRequestProcessor {
         // verify that timestamps increase monotonically
         for (int i = 0; i < 100; i++) {
             requestProc.timestampRequest(null, new MonitoringContext(metrics));
-            verify(persist, timeout(100).times(1)).addTimestampToBatch(eq(firstTS++), any(Channel.class), any(MonitoringContext.class));
+            verify(persist, timeout(100).times(1)).addTimestampToBatch(eq(firstTS), any(Channel.class), any(MonitoringContext.class));
+            firstTS += AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
         }
 
     }
@@ -112,8 +114,8 @@ public class TestRequestProcessor {
         long firstTS = TScapture.getValue();
 
         List<Long> writeSet = Lists.newArrayList(1L, 20L, 203L);
-        requestProc.commitRequest(firstTS - 1, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
-        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - 1), any(Channel.class), any(MonitoringContext.class));
+        requestProc.commitRequest(firstTS - AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
+        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN), any(Channel.class), any(MonitoringContext.class));
 
         requestProc.commitRequest(firstTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
         ArgumentCaptor<Long> commitTScapture = ArgumentCaptor.forClass(Long.class);
@@ -178,8 +180,8 @@ public class TestRequestProcessor {
     public void testLowWatermarkIsStoredOnlyWhenACacheElementIsEvicted() throws Exception {
 
         final int ANY_START_TS = 1;
-        final long FIRST_COMMIT_TS_EVICTED = 1L;
-        final long NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED = 2L;
+        final long FIRST_COMMIT_TS_EVICTED = AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
+        final long NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED = FIRST_COMMIT_TS_EVICTED + AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
 
         // Fill the cache to provoke a cache eviction
         for (long i = 0; i < CONFLICT_MAP_SIZE + CONFLICT_MAP_ASSOCIATIVITY; i++) {
@@ -192,7 +194,7 @@ public class TestRequestProcessor {
 
         // Check that first time its called is on init
         verify(persist, timeout(100).times(1)).persistLowWatermark(eq(0L));
-        // Then, check it is called when cache is full and the first element is evicted (should be a 1)
+        // Then, check it is called when cache is full and the first element is evicted (should be a AbstractTransactionManager.NUM_OF_CHECKPOINTS)
         verify(persist, timeout(100).times(1)).persistLowWatermark(eq(FIRST_COMMIT_TS_EVICTED));
         // Finally it should never be called with the next element
         verify(persist, timeout(100).never()).persistLowWatermark(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED));

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1f83aeda/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java b/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java
index c75e95b..a5f236c 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java
@@ -19,6 +19,7 @@ package org.apache.omid.tso;
 
 import org.apache.omid.metrics.MetricsRegistry;
 import org.apache.omid.timestamp.storage.TimestampStorage;
+import org.apache.omid.transaction.AbstractTransactionManager;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
@@ -51,6 +52,8 @@ public class TestTimestampOracle {
     private Panicker panicker;
     @Mock
     private TimestampStorage timestampStorage;
+    @Mock
+    TSOServerConfig config;
 
     // Component under test
     @InjectMocks
@@ -70,7 +73,7 @@ public class TestTimestampOracle {
         long last = timestampOracle.next();
         for (int i = 0; i < (3 * TimestampOracleImpl.TIMESTAMP_BATCH); i++) {
             long current = timestampOracle.next();
-            assertEquals(current, last + 1, "Not monotonic growth");
+            assertEquals(current, last + AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN, "Not monotonic growth");
             last = current;
         }
         assertTrue(timestampOracle.getLast() == last);

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1f83aeda/tso-server/src/test/java/org/apache/omid/tso/TestWorldTimeOracle.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestWorldTimeOracle.java b/tso-server/src/test/java/org/apache/omid/tso/TestWorldTimeOracle.java
index 825646c..df59530 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestWorldTimeOracle.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestWorldTimeOracle.java
@@ -51,6 +51,8 @@ public class TestWorldTimeOracle {
     private Panicker panicker;
     @Mock
     private TimestampStorage timestampStorage;
+    @Mock
+    private TSOServerConfig config;
 
     // Component under test
     @InjectMocks

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1f83aeda/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
index bab7e27..c4c9c61 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
@@ -21,8 +21,10 @@ import com.google.common.collect.Sets;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Module;
+
 import org.apache.omid.TestUtils;
 import org.apache.omid.committable.CommitTable;
+import org.apache.omid.transaction.AbstractTransactionManager;
 import org.apache.omid.tso.TSOMockModule;
 import org.apache.omid.tso.TSOServer;
 import org.apache.omid.tso.TSOServerConfig;
@@ -123,17 +125,21 @@ public class TestIntegrationOfTSOClientServerBasicFunctionality {
         referenceTimestamp = startTsTx1;
 
         long startTsTx2 = tsoClient.getNewStartTimestamp().get();
-        assertEquals(startTsTx2, ++referenceTimestamp, "Should grow monotonically");
+        referenceTimestamp += AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
+        assertEquals(startTsTx2, referenceTimestamp, "Should grow monotonically");
         assertTrue(startTsTx2 > startTsTx1, "Two timestamps obtained consecutively should grow");
 
         long commitTsTx2 = tsoClient.commit(startTsTx2, Sets.newHashSet(c1)).get();
-        assertEquals(commitTsTx2, ++referenceTimestamp, "Should grow monotonically");
+        referenceTimestamp += AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
+        assertEquals(commitTsTx2, referenceTimestamp, "Should grow monotonically");
 
         long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c2)).get();
-        assertEquals(commitTsTx1, ++referenceTimestamp, "Should grow monotonically");
+        referenceTimestamp += AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
+        assertEquals(commitTsTx1, referenceTimestamp, "Should grow monotonically");
 
         long startTsTx3 = tsoClient.getNewStartTimestamp().get();
-        assertEquals(startTsTx3, ++referenceTimestamp, "Should grow monotonically");
+        referenceTimestamp += AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
+        assertEquals(startTsTx3, referenceTimestamp, "Should grow monotonically");
     }
 
     @Test(timeOut = 30_000)

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1f83aeda/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientConnectionToTSO.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientConnectionToTSO.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientConnectionToTSO.java
index 2650e0e..26030b9 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientConnectionToTSO.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientConnectionToTSO.java
@@ -19,10 +19,12 @@ package org.apache.omid.tso.client;
 
 import com.google.inject.Guice;
 import com.google.inject.Injector;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.omid.TestUtils;
+import org.apache.omid.transaction.AbstractTransactionManager;
 import org.apache.omid.tso.HALeaseManagementModule;
 import org.apache.omid.tso.TSOMockModule;
 import org.apache.omid.tso.TSOServer;
@@ -137,7 +139,7 @@ public class TestTSOClientConnectionToTSO {
         // ... so we should get responses from the methods
         Long startTS = tsoClient.getNewStartTimestamp().get();
         LOG.info("Start TS {} ", startTS);
-        assertEquals(startTS.longValue(), 1);
+        assertEquals(startTS.longValue(), AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN);
 
         // Close the tsoClient connection and stop the TSO Server
         tsoClient.close().get();
@@ -175,7 +177,7 @@ public class TestTSOClientConnectionToTSO {
         // ... so we should get responses from the methods
         Long startTS = tsoClient.getNewStartTimestamp().get();
         LOG.info("Start TS {} ", startTS);
-        assertEquals(startTS.longValue(), 1);
+        assertEquals(startTS.longValue(), AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN);
 
         // Close the tsoClient connection and stop the TSO Server
         tsoClient.close().get();
@@ -213,7 +215,7 @@ public class TestTSOClientConnectionToTSO {
         // ... and check that initially we get responses from the methods
         Long startTS = tsoClient.getNewStartTimestamp().get();
         LOG.info("Start TS {} ", startTS);
-        assertEquals(startTS.longValue(), 1);
+        assertEquals(startTS.longValue(), AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN);
 
         // Then stop the server...
         tsoServer.stopAndWait();

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1f83aeda/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
index 1b5dce8..a2da056 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
@@ -21,9 +21,11 @@ import com.google.common.collect.Sets;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Module;
+
 import org.apache.omid.TestUtils;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.proto.TSOProto;
+import org.apache.omid.transaction.AbstractTransactionManager;
 import org.apache.omid.tso.PausableTimestampOracle;
 import org.apache.omid.tso.TSOMockModule;
 import org.apache.omid.tso.TSOServer;
@@ -348,7 +350,7 @@ public class TestTSOClientRequestAndResponseBehaviours {
         clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
         TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
         assertFalse(response.getCommitResponse().getAborted(), "Transaction should be committed");
-        assertEquals(response.getCommitResponse().getCommitTimestamp(), tx1ST + 1);
+        assertEquals(response.getCommitResponse().getCommitTimestamp(), tx1ST + AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN);
     }
 
     @Test(timeOut = 30_000)