You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by yo...@apache.org on 2019/02/05 08:57:02 UTC

[incubator-omid] branch 1.0.1 updated: [OMID-129] - cache uncommitted transactions in TransactionVisibilityFilterBase with Long.MAX_VALUE as commit timestamp

This is an automated email from the ASF dual-hosted git repository.

yonigo pushed a commit to branch 1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-omid.git


The following commit(s) were added to refs/heads/1.0.1 by this push:
     new cbe2a39  [OMID-129] - cache uncommitted transactions in TransactionVisibilityFilterBase with Long.MAX_VALUE as commit timestamp
cbe2a39 is described below

commit cbe2a391406e4c9771358029700fcc7a9e795a84
Author: Yonatan Gottesman <yo...@gmail.com>
AuthorDate: Tue Feb 5 10:56:36 2019 +0200

    [OMID-129] - cache uncommitted transactions in TransactionVisibilityFilterBase with Long.MAX_VALUE as commit timestamp
---
 hbase-coprocessor/pom.xml                          |  5 ++
 .../TransactionVisibilityFilterBase.java           | 34 +++++---
 .../omid/transaction/TestSnapshotFilter.java       | 93 ++++++++++++++++++++++
 3 files changed, 123 insertions(+), 9 deletions(-)

diff --git a/hbase-coprocessor/pom.xml b/hbase-coprocessor/pom.xml
index 3375db4..48ab6b1 100644
--- a/hbase-coprocessor/pom.xml
+++ b/hbase-coprocessor/pom.xml
@@ -136,6 +136,11 @@
             <artifactId>${shims.artifactId}</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-collections4</artifactId>
+            <version>4.1</version>
+        </dependency>
         <!-- end testing -->
 
     </dependencies>
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilterBase.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilterBase.java
index 287d2a2..b3a6baf 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilterBase.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilterBase.java
@@ -19,17 +19,22 @@ package org.apache.omid.transaction;
 
 import com.google.common.base.Optional;
 
+
+import org.apache.commons.collections4.map.LRUMap;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.omid.OmidFilterBase;
 
+
 import java.io.IOException;
 import java.util.HashMap;
+
 import java.util.List;
 import java.util.Map;
 
@@ -39,7 +44,7 @@ public class TransactionVisibilityFilterBase extends OmidFilterBase {
     // optional sub-filter to apply to visible cells
     private final Filter userFilter;
     private final SnapshotFilterImpl snapshotFilter;
-    private final Map<Long ,Long> commitCache;
+    private final LRUMap<Long ,Long> commitCache;
     private final HBaseTransaction hbaseTransaction;
 
     // This cache is cleared when moving to the next row
@@ -51,9 +56,10 @@ public class TransactionVisibilityFilterBase extends OmidFilterBase {
                                            HBaseTransaction hbaseTransaction) {
         this.userFilter = cellFilter;
         this.snapshotFilter = snapshotFilter;
-        commitCache = new HashMap<>();
+        commitCache = new LRUMap<>(1000);
         this.hbaseTransaction = hbaseTransaction;
         familyDeletionCache = new HashMap<>();
+
     }
 
     @Override
@@ -69,15 +75,14 @@ public class TransactionVisibilityFilterBase extends OmidFilterBase {
             }
         }
 
-        Optional<Long> ct = getCommitIfInSnapshot(v, CellUtils.isFamilyDeleteCell(v));
-        if (ct.isPresent()) {
-            commitCache.put(v.getTimestamp(), ct.get());
+        Optional<Long> commitTS = getCommitIfInSnapshot(v, CellUtils.isFamilyDeleteCell(v));
+        if (commitTS.isPresent()) {
             if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL &&
                     snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
                 return runUserFilter(v, ReturnCode.INCLUDE);
             }
             if (CellUtils.isFamilyDeleteCell(v)) {
-                familyDeletionCache.put(createImmutableBytesWritable(v), ct.get());
+                familyDeletionCache.put(createImmutableBytesWritable(v), commitTS.get());
                 if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
                     return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
                 } else {
@@ -134,8 +139,12 @@ public class TransactionVisibilityFilterBase extends OmidFilterBase {
     // For family delete cells, the sc hasn't arrived yet so get sc from region before going to ct
     private Optional<Long> getCommitIfInSnapshot(Cell v, boolean getShadowCellBeforeCT) throws IOException {
         Long cachedCommitTS = commitCache.get(v.getTimestamp());
-        if (cachedCommitTS != null && hbaseTransaction.getStartTimestamp() >= cachedCommitTS) {
-            return Optional.of(cachedCommitTS);
+        if (cachedCommitTS != null) {
+            if (hbaseTransaction.getStartTimestamp() >= cachedCommitTS) {
+                return Optional.of(cachedCommitTS);
+            } else {
+                return Optional.absent();
+            }
         }
         if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
             return Optional.of(v.getTimestamp());
@@ -151,13 +160,20 @@ public class TransactionVisibilityFilterBase extends OmidFilterBase {
 
             if (!shadowCell.isEmpty()) {
                 long commitTS = Bytes.toLong(CellUtil.cloneValue(shadowCell.rawCells()[0]));
+                commitCache.put(v.getTimestamp(), commitTS);
                 if (commitTS <= hbaseTransaction.getStartTimestamp()) {
                     return Optional.of(commitTS);
                 }
             }
         }
 
-        return snapshotFilter.getTSIfInSnapshot(v, hbaseTransaction, commitCache);
+        Optional<Long> commitTS = snapshotFilter.getTSIfInSnapshot(v, hbaseTransaction, commitCache);
+        if (commitTS.isPresent()) {
+            commitCache.put(v.getTimestamp(), commitTS.get());
+        } else {
+            commitCache.put(v.getTimestamp(), Long.MAX_VALUE);
+        }
+        return commitTS;
     }
 
 
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
index ebf2ba3..46b1c4a 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
@@ -20,15 +20,18 @@ package org.apache.omid.transaction;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -44,6 +47,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.hadoop.hbase.filter.FamilyFilter;
@@ -59,6 +63,7 @@ import org.apache.omid.metrics.NullMetricsProvider;
 import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
 import org.apache.omid.tso.TSOServer;
 import org.apache.omid.tso.TSOServerConfig;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
@@ -883,4 +888,92 @@ public class TestSnapshotFilter {
         tt.close();
     }
 
+
+    @Test (timeOut = 60_000)
+    public void testFilterCommitCacheInSnapshot() throws Throwable {
+        String TEST_TABLE = "testScanWithFilter";
+        byte[] rowName = Bytes.toBytes("row1");
+        byte[] famName = Bytes.toBytes(TEST_FAMILY);
+
+        createTableIfNotExists(TEST_TABLE, famName);
+        TTable tt = new TTable(connection, TEST_TABLE);
+
+        Transaction tx1 = tm.begin();
+        Put put = new Put(rowName);
+        for (int i = 0; i < 200; ++i) {
+            byte[] dataValue1 = Bytes.toBytes("some data");
+            byte[] colName = Bytes.toBytes("col" + i);
+            put.addColumn(famName, colName, dataValue1);
+        }
+        tt.put(tx1, put);
+        tm.commit(tx1);
+        Transaction tx3 = tm.begin();
+
+        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
+        SnapshotFilterImpl snapshotFilter = spy(new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
+                tm.getCommitTableClient()));
+        Filter newFilter = TransactionFilters.getVisibilityFilter(null,
+                snapshotFilter, (HBaseTransaction) tx3);
+
+        Table rawTable = connection.getTable(TableName.valueOf(TEST_TABLE));
+
+        Scan scan = new Scan();
+        ResultScanner scanner = rawTable.getScanner(scan);
+
+        for(Result row: scanner) {
+            for(Cell cell: row.rawCells()) {
+                newFilter.filterKeyValue(cell);
+
+            }
+        }
+        verify(snapshotFilter, Mockito.times(0))
+                .getTSIfInSnapshot(any(Cell.class),any(HBaseTransaction.class), any(Map.class));
+        tm.commit(tx3);
+        tt.close();
+    }
+
+    @Test (timeOut = 60_000)
+    public void testFilterCommitCacheNotInSnapshot() throws Throwable {
+        String TEST_TABLE = "testScanWithFilter";
+        byte[] rowName = Bytes.toBytes("row1");
+        byte[] famName = Bytes.toBytes(TEST_FAMILY);
+
+        createTableIfNotExists(TEST_TABLE, famName);
+        TTable tt = new TTable(connection, TEST_TABLE);
+
+
+        //add some uncommitted values
+        Transaction tx1 = tm.begin();
+        Put put = new Put(rowName);
+        for (int i = 0; i < 200; ++i) {
+            byte[] dataValue1 = Bytes.toBytes("some data");
+            byte[] colName = Bytes.toBytes("col" + i);
+            put.addColumn(famName, colName, dataValue1);
+        }
+        tt.put(tx1, put);
+
+        //try to scan from tx
+        Transaction tx = tm.begin();
+        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
+        SnapshotFilterImpl snapshotFilter = spy(new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
+                tm.getCommitTableClient()));
+        Filter newFilter = TransactionFilters.getVisibilityFilter(null,
+                snapshotFilter, (HBaseTransaction) tx);
+
+        Table rawTable = connection.getTable(TableName.valueOf(TEST_TABLE));
+
+        Scan scan = new Scan();
+        ResultScanner scanner = rawTable.getScanner(scan);
+
+        for(Result row: scanner) {
+            for(Cell cell: row.rawCells()) {
+                newFilter.filterKeyValue(cell);
+            }
+        }
+        verify(snapshotFilter, Mockito.times(1))
+                .getTSIfInSnapshot(any(Cell.class),any(HBaseTransaction.class), any(Map.class));
+        tt.close();
+    }
+
+
 }