You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by yo...@apache.org on 2019/02/05 08:57:02 UTC
[incubator-omid] branch 1.0.1 updated: [OMID-129] - cache
uncommitted transactions in TransactionVisibilityFilterBase with
Long.MAX_VALUE as commit timestamp
This is an automated email from the ASF dual-hosted git repository.
yonigo pushed a commit to branch 1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-omid.git
The following commit(s) were added to refs/heads/1.0.1 by this push:
new cbe2a39 [OMID-129] - cache uncommitted transactions in TransactionVisibilityFilterBase with Long.MAX_VALUE as commit timestamp
cbe2a39 is described below
commit cbe2a391406e4c9771358029700fcc7a9e795a84
Author: Yonatan Gottesman <yo...@gmail.com>
AuthorDate: Tue Feb 5 10:56:36 2019 +0200
[OMID-129] - cache uncommitted transactions in TransactionVisibilityFilterBase with Long.MAX_VALUE as commit timestamp
---
hbase-coprocessor/pom.xml | 5 ++
.../TransactionVisibilityFilterBase.java | 34 +++++---
.../omid/transaction/TestSnapshotFilter.java | 93 ++++++++++++++++++++++
3 files changed, 123 insertions(+), 9 deletions(-)
diff --git a/hbase-coprocessor/pom.xml b/hbase-coprocessor/pom.xml
index 3375db4..48ab6b1 100644
--- a/hbase-coprocessor/pom.xml
+++ b/hbase-coprocessor/pom.xml
@@ -136,6 +136,11 @@
<artifactId>${shims.artifactId}</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-collections4</artifactId>
+ <version>4.1</version>
+ </dependency>
<!-- end testing -->
</dependencies>
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilterBase.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilterBase.java
index 287d2a2..b3a6baf 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilterBase.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilterBase.java
@@ -19,17 +19,22 @@ package org.apache.omid.transaction;
import com.google.common.base.Optional;
+
+import org.apache.commons.collections4.map.LRUMap;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.omid.OmidFilterBase;
+
import java.io.IOException;
import java.util.HashMap;
+
import java.util.List;
import java.util.Map;
@@ -39,7 +44,7 @@ public class TransactionVisibilityFilterBase extends OmidFilterBase {
// optional sub-filter to apply to visible cells
private final Filter userFilter;
private final SnapshotFilterImpl snapshotFilter;
- private final Map<Long ,Long> commitCache;
+ private final LRUMap<Long ,Long> commitCache;
private final HBaseTransaction hbaseTransaction;
// This cache is cleared when moving to the next row
@@ -51,9 +56,10 @@ public class TransactionVisibilityFilterBase extends OmidFilterBase {
HBaseTransaction hbaseTransaction) {
this.userFilter = cellFilter;
this.snapshotFilter = snapshotFilter;
- commitCache = new HashMap<>();
+ commitCache = new LRUMap<>(1000);
this.hbaseTransaction = hbaseTransaction;
familyDeletionCache = new HashMap<>();
+
}
@Override
@@ -69,15 +75,14 @@ public class TransactionVisibilityFilterBase extends OmidFilterBase {
}
}
- Optional<Long> ct = getCommitIfInSnapshot(v, CellUtils.isFamilyDeleteCell(v));
- if (ct.isPresent()) {
- commitCache.put(v.getTimestamp(), ct.get());
+ Optional<Long> commitTS = getCommitIfInSnapshot(v, CellUtils.isFamilyDeleteCell(v));
+ if (commitTS.isPresent()) {
if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL &&
snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
return runUserFilter(v, ReturnCode.INCLUDE);
}
if (CellUtils.isFamilyDeleteCell(v)) {
- familyDeletionCache.put(createImmutableBytesWritable(v), ct.get());
+ familyDeletionCache.put(createImmutableBytesWritable(v), commitTS.get());
if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
} else {
@@ -134,8 +139,12 @@ public class TransactionVisibilityFilterBase extends OmidFilterBase {
// For family delete cells, the sc hasn't arrived yet so get sc from region before going to ct
private Optional<Long> getCommitIfInSnapshot(Cell v, boolean getShadowCellBeforeCT) throws IOException {
Long cachedCommitTS = commitCache.get(v.getTimestamp());
- if (cachedCommitTS != null && hbaseTransaction.getStartTimestamp() >= cachedCommitTS) {
- return Optional.of(cachedCommitTS);
+ if (cachedCommitTS != null) {
+ if (hbaseTransaction.getStartTimestamp() >= cachedCommitTS) {
+ return Optional.of(cachedCommitTS);
+ } else {
+ return Optional.absent();
+ }
}
if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
return Optional.of(v.getTimestamp());
@@ -151,13 +160,20 @@ public class TransactionVisibilityFilterBase extends OmidFilterBase {
if (!shadowCell.isEmpty()) {
long commitTS = Bytes.toLong(CellUtil.cloneValue(shadowCell.rawCells()[0]));
+ commitCache.put(v.getTimestamp(), commitTS);
if (commitTS <= hbaseTransaction.getStartTimestamp()) {
return Optional.of(commitTS);
}
}
}
- return snapshotFilter.getTSIfInSnapshot(v, hbaseTransaction, commitCache);
+ Optional<Long> commitTS = snapshotFilter.getTSIfInSnapshot(v, hbaseTransaction, commitCache);
+ if (commitTS.isPresent()) {
+ commitCache.put(v.getTimestamp(), commitTS.get());
+ } else {
+ commitCache.put(v.getTimestamp(), Long.MAX_VALUE);
+ }
+ return commitTS;
}
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
index ebf2ba3..46b1c4a 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
@@ -20,15 +20,18 @@ package org.apache.omid.transaction;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.io.IOException;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -44,6 +47,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FamilyFilter;
@@ -59,6 +63,7 @@ import org.apache.omid.metrics.NullMetricsProvider;
import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
import org.apache.omid.tso.TSOServer;
import org.apache.omid.tso.TSOServerConfig;
+import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
@@ -883,4 +888,92 @@ public class TestSnapshotFilter {
tt.close();
}
+
+ @Test (timeOut = 60_000)
+ public void testFilterCommitCacheInSnapshot() throws Throwable {
+ String TEST_TABLE = "testScanWithFilter";
+ byte[] rowName = Bytes.toBytes("row1");
+ byte[] famName = Bytes.toBytes(TEST_FAMILY);
+
+ createTableIfNotExists(TEST_TABLE, famName);
+ TTable tt = new TTable(connection, TEST_TABLE);
+
+ Transaction tx1 = tm.begin();
+ Put put = new Put(rowName);
+ for (int i = 0; i < 200; ++i) {
+ byte[] dataValue1 = Bytes.toBytes("some data");
+ byte[] colName = Bytes.toBytes("col" + i);
+ put.addColumn(famName, colName, dataValue1);
+ }
+ tt.put(tx1, put);
+ tm.commit(tx1);
+ Transaction tx3 = tm.begin();
+
+ Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
+ SnapshotFilterImpl snapshotFilter = spy(new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
+ tm.getCommitTableClient()));
+ Filter newFilter = TransactionFilters.getVisibilityFilter(null,
+ snapshotFilter, (HBaseTransaction) tx3);
+
+ Table rawTable = connection.getTable(TableName.valueOf(TEST_TABLE));
+
+ Scan scan = new Scan();
+ ResultScanner scanner = rawTable.getScanner(scan);
+
+ for(Result row: scanner) {
+ for(Cell cell: row.rawCells()) {
+ newFilter.filterKeyValue(cell);
+
+ }
+ }
+ verify(snapshotFilter, Mockito.times(0))
+ .getTSIfInSnapshot(any(Cell.class),any(HBaseTransaction.class), any(Map.class));
+ tm.commit(tx3);
+ tt.close();
+ }
+
+ @Test (timeOut = 60_000)
+ public void testFilterCommitCacheNotInSnapshot() throws Throwable {
+ String TEST_TABLE = "testScanWithFilter";
+ byte[] rowName = Bytes.toBytes("row1");
+ byte[] famName = Bytes.toBytes(TEST_FAMILY);
+
+ createTableIfNotExists(TEST_TABLE, famName);
+ TTable tt = new TTable(connection, TEST_TABLE);
+
+
+ //add some uncommitted values
+ Transaction tx1 = tm.begin();
+ Put put = new Put(rowName);
+ for (int i = 0; i < 200; ++i) {
+ byte[] dataValue1 = Bytes.toBytes("some data");
+ byte[] colName = Bytes.toBytes("col" + i);
+ put.addColumn(famName, colName, dataValue1);
+ }
+ tt.put(tx1, put);
+
+ //try to scan from tx
+ Transaction tx = tm.begin();
+ Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
+ SnapshotFilterImpl snapshotFilter = spy(new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
+ tm.getCommitTableClient()));
+ Filter newFilter = TransactionFilters.getVisibilityFilter(null,
+ snapshotFilter, (HBaseTransaction) tx);
+
+ Table rawTable = connection.getTable(TableName.valueOf(TEST_TABLE));
+
+ Scan scan = new Scan();
+ ResultScanner scanner = rawTable.getScanner(scan);
+
+ for(Result row: scanner) {
+ for(Cell cell: row.rawCells()) {
+ newFilter.filterKeyValue(cell);
+ }
+ }
+ verify(snapshotFilter, Mockito.times(1))
+ .getTSIfInSnapshot(any(Cell.class),any(HBaseTransaction.class), any(Map.class));
+ tt.close();
+ }
+
+
}