You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by oh...@apache.org on 2018/08/09 11:46:09 UTC
incubator-omid git commit: [OMID-102] Fix coprocessor scanner bug.
Fix hbase-0 dependencies.
Repository: incubator-omid
Updated Branches:
refs/heads/phoenix-integration 395fb46f2 -> 3f387eadb
[OMID-102] Fix coprocessor scanner bug. Fix hbase-0 dependencies.
Signed-off-by: Ohad Shacham <oh...@yahoo-inc.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/3f387ead
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/3f387ead
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/3f387ead
Branch: refs/heads/phoenix-integration
Commit: 3f387eadb4b49067c0dfea0374dc9caab5a5bd3f
Parents: 395fb46
Author: Yonatan Gottesman <yo...@gmail.com>
Authored: Wed Aug 8 16:13:36 2018 +0300
Committer: Ohad Shacham <oh...@yahoo-inc.com>
Committed: Thu Aug 9 14:39:45 2018 +0300
----------------------------------------------------------------------
.../hbase/regionserver/OmidRegionScanner.java | 98 --------------------
.../omid/transaction/OmidSnapshotFilter.java | 38 +++++++-
.../omid/transaction/TestSnapshotFilter.java | 46 +++++++++
3 files changed, 80 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3f387ead/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/OmidRegionScanner.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/OmidRegionScanner.java b/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/OmidRegionScanner.java
deleted file mode 100644
index 5f7de96..0000000
--- a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/OmidRegionScanner.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Queue;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.omid.transaction.SnapshotFilterImpl;
-
-public class OmidRegionScanner implements RegionScanner {
-
- private RegionScanner scanner;
- private SnapshotFilterImpl snapshotFilter;
- private final Queue<SnapshotFilterImpl> snapshotFilterQueue;
-
-
- public OmidRegionScanner(SnapshotFilterImpl snapshotFilter,
- RegionScanner s, Queue<SnapshotFilterImpl> snapshotFilterQueue) {
- this.snapshotFilter = snapshotFilter;
- this.scanner = s;
- this.snapshotFilterQueue = snapshotFilterQueue;
- }
-
- @Override
- public boolean next(List<Cell> results) throws IOException {
- return scanner.next(results);
- }
-
- @Override
- public boolean next(List<Cell> list, ScannerContext scannerContext) throws IOException {
- return scanner.next(list, scannerContext);
- }
-
- @Override
- public void close() throws IOException {
- scanner.close();
- snapshotFilterQueue.add(snapshotFilter);
- }
-
- @Override
- public HRegionInfo getRegionInfo() {
- return scanner.getRegionInfo();
- }
-
- @Override
- public boolean isFilterDone() throws IOException {
- return scanner.isFilterDone();
- }
-
- @Override
- public boolean reseek(byte[] row) throws IOException {
- return scanner.reseek(row);
- }
-
- @Override
- public long getMaxResultSize() {
- return scanner.getMaxResultSize();
- }
-
- @Override
- public long getMvccReadPoint() {
- return scanner.getMvccReadPoint();
- }
-
- @Override
- public int getBatch() {
- return scanner.getBatch();
- }
-
- @Override
- public boolean nextRaw(List<Cell> result) throws IOException {
- return scanner.nextRaw(result);
- }
-
- @Override
- public boolean nextRaw(List<Cell> list, ScannerContext scannerContext) throws IOException {
- return scanner.nextRaw(list, scannerContext);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3f387ead/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
index 0d13b62..c024c15 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
@@ -22,7 +22,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.regionserver.OmidRegionScanner;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.hbase.HBaseCommitTable;
@@ -45,7 +45,9 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.COMMIT_TABLE_NAME_KEY;
@@ -59,8 +61,8 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
private HBaseCommitTableConfig commitTableConf = null;
private Configuration conf = null;
- Queue<SnapshotFilterImpl> snapshotFilterQueue = new ConcurrentLinkedQueue<>();
-
+ private Queue<SnapshotFilterImpl> snapshotFilterQueue = new ConcurrentLinkedQueue<>();
+ private Map<Object, SnapshotFilterImpl> snapshotFilterMap = new ConcurrentHashMap();
private CommitTable.Client inMemoryCommitTable = null;
public OmidSnapshotFilter(CommitTable.Client commitTableClient) {
@@ -154,10 +156,38 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
Filter newFilter = TransactionFilters.getVisibilityFilter(scan.getFilter(),
snapshotFilter, hbaseTransaction);
scan.setFilter(newFilter);
+ snapshotFilterMap.put(scan, snapshotFilter);
+ return s;
+ }
+
+ @Override
+ public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
+ Scan scan,
+ RegionScanner s) throws IOException {
+ byte[] byteTransaction = scan.getAttribute(CellUtils.TRANSACTION_ATTRIBUTE);
+
+ if (byteTransaction == null) {
+ return s;
+ }
- return new OmidRegionScanner(snapshotFilter, s, snapshotFilterQueue);
+ SnapshotFilterImpl snapshotFilter = snapshotFilterMap.get(scan);
+ assert(snapshotFilter != null);
+ snapshotFilterMap.remove(scan);
+ snapshotFilterMap.put(s, snapshotFilter);
+ return s;
}
+ @Override
+ public void preScannerClose(ObserverContext<RegionCoprocessorEnvironment> e, InternalScanner s)
+ throws IOException {
+ SnapshotFilterImpl snapshotFilter = snapshotFilterMap.get(s);
+ if (snapshotFilter != null) {
+ snapshotFilterQueue.add(snapshotFilter);
+ }
+ }
+
+
+
private HBaseTransaction getHBaseTransaction(byte[] byteTransaction)
throws InvalidProtocolBufferException {
TSOProto.Transaction transaction = TSOProto.Transaction.parseFrom(byteTransaction);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3f387ead/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
index aeae112..e8c12fa 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
@@ -286,6 +286,52 @@ public class TestSnapshotFilter {
}
+ // This test will fail if filtering is done before snapshot filtering
+ @Test(timeOut = 60_000)
+ public void testServerSideSnapshotScannerFiltering() throws Throwable {
+ byte[] rowName1 = Bytes.toBytes("row1");
+ byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
+ byte[] colName1 = Bytes.toBytes("col1");
+ byte[] dataValue1 = Bytes.toBytes("testWrite-1");
+ byte[] dataValue2 = Bytes.toBytes("testWrite-2");
+
+ String TEST_TABLE = "testServerSideSnapshotFiltering";
+ createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
+
+ TTable tt = new TTable(hbaseConf, TEST_TABLE);
+
+ Transaction tx1 = tm.begin();
+ Put put1 = new Put(rowName1);
+ put1.add(famName1, colName1, dataValue1);
+ tt.put(tx1, put1);
+ tm.commit(tx1);
+
+ Transaction tx2 = tm.begin();
+ Put put2 = new Put(rowName1);
+ put2.add(famName1, colName1, dataValue2);
+// tt.put(tx2, put2);
+
+ Transaction tx3 = tm.begin();
+
+ // If snapshot filtering is not done in the server then the first value is
+ // "testWrite-2" and the whole row will be filtered out.
+ SingleColumnValueFilter filter = new SingleColumnValueFilter(
+ famName1,
+ colName1,
+ CompareFilter.CompareOp.EQUAL,
+ new SubstringComparator("testWrite-1"));
+
+
+ Scan scan = new Scan();
+ scan.setFilter(filter);
+
+ ResultScanner iterableRS = tt.getScanner(tx3, scan);
+ Result result = iterableRS.next();
+
+ assertTrue(result.size() == 1);
+ }
+
+
@Test(timeOut = 60_000)
public void testGetWithFamilyDelete() throws Throwable {
byte[] rowName1 = Bytes.toBytes("row1");