You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by oh...@apache.org on 2018/08/09 11:46:09 UTC

incubator-omid git commit: [OMID-102] Fix coprocessor scanner bug. Fix hbase-0 dependencies.

Repository: incubator-omid
Updated Branches:
  refs/heads/phoenix-integration 395fb46f2 -> 3f387eadb


[OMID-102] Fix coprocessor scanner bug. Fix hbase-0 dependencies.

Signed-off-by: Ohad Shacham <oh...@yahoo-inc.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/3f387ead
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/3f387ead
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/3f387ead

Branch: refs/heads/phoenix-integration
Commit: 3f387eadb4b49067c0dfea0374dc9caab5a5bd3f
Parents: 395fb46
Author: Yonatan Gottesman <yo...@gmail.com>
Authored: Wed Aug 8 16:13:36 2018 +0300
Committer: Ohad Shacham <oh...@yahoo-inc.com>
Committed: Thu Aug 9 14:39:45 2018 +0300

----------------------------------------------------------------------
 .../hbase/regionserver/OmidRegionScanner.java   | 98 --------------------
 .../omid/transaction/OmidSnapshotFilter.java    | 38 +++++++-
 .../omid/transaction/TestSnapshotFilter.java    | 46 +++++++++
 3 files changed, 80 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3f387ead/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/OmidRegionScanner.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/OmidRegionScanner.java b/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/OmidRegionScanner.java
deleted file mode 100644
index 5f7de96..0000000
--- a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/OmidRegionScanner.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Queue;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.omid.transaction.SnapshotFilterImpl;
-
-public class OmidRegionScanner implements RegionScanner {
-
-    private RegionScanner scanner;
-    private SnapshotFilterImpl snapshotFilter;
-    private final Queue<SnapshotFilterImpl> snapshotFilterQueue;
-
-
-    public OmidRegionScanner(SnapshotFilterImpl snapshotFilter,
-                             RegionScanner s, Queue<SnapshotFilterImpl> snapshotFilterQueue) {
-        this.snapshotFilter = snapshotFilter;
-        this.scanner = s;
-        this.snapshotFilterQueue = snapshotFilterQueue;
-    }
-
-    @Override
-    public boolean next(List<Cell> results) throws IOException {
-       return scanner.next(results);
-    }
-
-    @Override
-    public boolean next(List<Cell> list, ScannerContext scannerContext) throws IOException {
-        return scanner.next(list, scannerContext);
-    }
-
-    @Override
-    public void close() throws IOException {
-        scanner.close();
-        snapshotFilterQueue.add(snapshotFilter);
-    }
-
-    @Override
-    public HRegionInfo getRegionInfo() {
-        return scanner.getRegionInfo();
-    }
-
-    @Override
-    public boolean isFilterDone() throws IOException {
-        return scanner.isFilterDone();
-    }
-
-    @Override
-    public boolean reseek(byte[] row) throws IOException {
-        return scanner.reseek(row);
-    }
-
-    @Override
-    public long getMaxResultSize() {
-        return scanner.getMaxResultSize();
-    }
-
-    @Override
-    public long getMvccReadPoint() {
-        return scanner.getMvccReadPoint();
-    }
-
-    @Override
-    public int getBatch() {
-        return scanner.getBatch();
-    }
-
-    @Override
-    public boolean nextRaw(List<Cell> result) throws IOException {
-        return scanner.nextRaw(result);
-    }
-
-    @Override
-    public boolean nextRaw(List<Cell> list, ScannerContext scannerContext) throws IOException {
-        return scanner.nextRaw(list, scannerContext);
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3f387ead/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
index 0d13b62..c024c15 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
@@ -22,7 +22,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
 
-import org.apache.hadoop.hbase.regionserver.OmidRegionScanner;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.committable.hbase.HBaseCommitTable;
@@ -45,7 +45,9 @@ import java.io.IOException;
 
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.COMMIT_TABLE_NAME_KEY;
@@ -59,8 +61,8 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
 
     private HBaseCommitTableConfig commitTableConf = null;
     private Configuration conf = null;
-    Queue<SnapshotFilterImpl> snapshotFilterQueue = new ConcurrentLinkedQueue<>();
-
+    private Queue<SnapshotFilterImpl> snapshotFilterQueue = new ConcurrentLinkedQueue<>();
+    private Map<Object, SnapshotFilterImpl> snapshotFilterMap = new ConcurrentHashMap();
     private CommitTable.Client inMemoryCommitTable = null;
 
     public OmidSnapshotFilter(CommitTable.Client commitTableClient) {
@@ -154,10 +156,38 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
         Filter newFilter = TransactionFilters.getVisibilityFilter(scan.getFilter(),
                 snapshotFilter, hbaseTransaction);
         scan.setFilter(newFilter);
+        snapshotFilterMap.put(scan, snapshotFilter);
+        return s;
+    }
+
+    @Override
+    public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
+                                         Scan scan,
+                                         RegionScanner s) throws IOException {
+        byte[] byteTransaction = scan.getAttribute(CellUtils.TRANSACTION_ATTRIBUTE);
+
+        if (byteTransaction == null) {
+            return s;
+        }
 
-        return new OmidRegionScanner(snapshotFilter, s, snapshotFilterQueue);
+        SnapshotFilterImpl snapshotFilter = snapshotFilterMap.get(scan);
+        assert(snapshotFilter != null);
+        snapshotFilterMap.remove(scan);
+        snapshotFilterMap.put(s, snapshotFilter);
+        return s;
     }
 
+    @Override
+    public void preScannerClose(ObserverContext<RegionCoprocessorEnvironment> e, InternalScanner s)
+            throws IOException {
+        SnapshotFilterImpl snapshotFilter = snapshotFilterMap.get(s);
+        if (snapshotFilter != null) {
+            snapshotFilterQueue.add(snapshotFilter);
+        }
+    }
+
+
+
     private HBaseTransaction getHBaseTransaction(byte[] byteTransaction)
             throws InvalidProtocolBufferException {
         TSOProto.Transaction transaction = TSOProto.Transaction.parseFrom(byteTransaction);

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3f387ead/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
index aeae112..e8c12fa 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
@@ -286,6 +286,52 @@ public class TestSnapshotFilter {
     }
 
 
+    // This test will fail if filtering is done before snapshot filtering
+    @Test(timeOut = 60_000)
+    public void testServerSideSnapshotScannerFiltering() throws Throwable {
+        byte[] rowName1 = Bytes.toBytes("row1");
+        byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
+        byte[] colName1 = Bytes.toBytes("col1");
+        byte[] dataValue1 = Bytes.toBytes("testWrite-1");
+        byte[] dataValue2 = Bytes.toBytes("testWrite-2");
+
+        String TEST_TABLE = "testServerSideSnapshotFiltering";
+        createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
+
+        TTable tt = new TTable(hbaseConf, TEST_TABLE);
+
+        Transaction tx1 = tm.begin();
+        Put put1 = new Put(rowName1);
+        put1.add(famName1, colName1, dataValue1);
+        tt.put(tx1, put1);
+        tm.commit(tx1);
+
+        Transaction tx2 = tm.begin();
+        Put put2 = new Put(rowName1);
+        put2.add(famName1, colName1, dataValue2);
+//        tt.put(tx2, put2);
+
+        Transaction tx3 = tm.begin();
+
+        // If snapshot filtering is not done in the server then the first value is
+        // "testWrite-2" and the whole row will be filtered out.
+        SingleColumnValueFilter filter = new SingleColumnValueFilter(
+                famName1,
+                colName1,
+                CompareFilter.CompareOp.EQUAL,
+                new SubstringComparator("testWrite-1"));
+
+
+        Scan scan = new Scan();
+        scan.setFilter(filter);
+
+        ResultScanner iterableRS = tt.getScanner(tx3, scan);
+        Result result = iterableRS.next();
+
+        assertTrue(result.size() == 1);
+    }
+
+
     @Test(timeOut = 60_000)
     public void testGetWithFamilyDelete() throws Throwable {
         byte[] rowName1 = Bytes.toBytes("row1");