You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by yo...@apache.org on 2019/04/18 07:11:24 UTC
[incubator-omid] branch 1.0.1 updated: [OMID-145] add commit cache
to compation
This is an automated email from the ASF dual-hosted git repository.
yonigo pushed a commit to branch 1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-omid.git
The following commit(s) were added to refs/heads/1.0.1 by this push:
new 8cfacb1 [OMID-145] add commit cache to compation
8cfacb1 is described below
commit 8cfacb177a2d12440b732ef36e92b30d5fad07c6
Author: Yonatan Gottesman <yo...@gmail.com>
AuthorDate: Tue Apr 16 15:27:02 2019 +0300
[OMID-145] add commit cache to compation
---
.../org/apache/omid/transaction/CompactorScanner.java | 17 ++++++++++++++---
1 file changed, 14 insertions(+), 3 deletions(-)
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
index 9fdf937..70c1d27 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
+import org.apache.commons.collections4.map.LRUMap;
import org.apache.omid.HBaseShims;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.CommitTable.Client;
@@ -44,7 +45,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
+
import java.util.SortedMap;
import java.util.concurrent.ExecutionException;
@@ -63,6 +64,7 @@ public class CompactorScanner implements InternalScanner {
private boolean hasMoreRows = false;
private List<Cell> currentRowWorthValues = new ArrayList<Cell>();
+ private final LRUMap<Long ,Optional<CommitTimestamp>> commitCache;
public CompactorScanner(ObserverContext<RegionCoprocessorEnvironment> e,
InternalScanner internalScanner,
@@ -76,6 +78,7 @@ public class CompactorScanner implements InternalScanner {
this.lowWatermark = getLowWatermarkFromCommitTable();
// Obtain the table in which the scanner is going to operate
this.hRegion = HBaseShims.getRegionCoprocessorRegion(e.getEnvironment());
+ commitCache = new LRUMap<>(1000);
LOG.info("Scanner cleaning up uncommitted txs older than LW [{}] in region [{}]",
lowWatermark, hRegion.getRegionInfo());
}
@@ -226,9 +229,14 @@ public class CompactorScanner implements InternalScanner {
}
private Optional<CommitTimestamp> queryCommitTimestamp(Cell cell) throws IOException {
+ Optional<CommitTimestamp> cachedValue = commitCache.get(cell.getTimestamp());
+ if (cachedValue != null) {
+ return cachedValue;
+ }
try {
Optional<CommitTimestamp> ct = commitTableClient.getCommitTimestamp(cell.getTimestamp()).get();
if (ct.isPresent()) {
+ commitCache.put(cell.getTimestamp(), ct);
return Optional.of(ct.get());
} else {
Get g = new Get(CellUtil.cloneRow(cell));
@@ -240,8 +248,11 @@ public class CompactorScanner implements InternalScanner {
g.setTimeStamp(cell.getTimestamp());
Result r = hRegion.get(g);
if (r.containsColumn(family, qualifier)) {
- return Optional.of(new CommitTimestamp(SHADOW_CELL,
+ Optional<CommitTimestamp> retval = Optional.of(new CommitTimestamp(SHADOW_CELL,
Bytes.toLong(r.getValue(family, qualifier)), true));
+ commitCache.put(cell.getTimestamp(), retval);
+ return retval;
+
}
}
} catch (InterruptedException e) {
@@ -250,7 +261,7 @@ public class CompactorScanner implements InternalScanner {
} catch (ExecutionException e) {
throw new IOException("Error getting commit timestamp from commit table", e);
}
-
+ commitCache.put(cell.getTimestamp(), Optional.<CommitTimestamp>absent());
return Optional.absent();
}