You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by oh...@apache.org on 2018/02/07 10:45:04 UTC

incubator-omid git commit: [OMID-83] Attributes added to Put, Get, and Scan are not propagated to HBase. In many cases, as in the Phoenix case, these attributes are required and should be propagated to the server side. In Phoenix for example, attributes

Repository: incubator-omid
Updated Branches:
  refs/heads/phoenix-integration fab2cfebb -> 9c48cfde9


[OMID-83] Attributes added to Put, Get, and Scan are not propagated to HBase. In many cases, as in the Phoenix case, these attributes are required and should be propagated to the server side. In Phoenix for example, attributes are used to mark data as one that should propagate to the secondary index.
        This commit propagates the attributes to HBase side.


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/9c48cfde
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/9c48cfde
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/9c48cfde

Branch: refs/heads/phoenix-integration
Commit: 9c48cfde9b8ac038686e0e6ce3a3c43b8d75e0d8
Parents: fab2cfe
Author: Ohad Shacham <oh...@yahoo-inc.com>
Authored: Wed Feb 7 12:44:37 2018 +0200
Committer: Ohad Shacham <oh...@yahoo-inc.com>
Committed: Wed Feb 7 12:44:37 2018 +0200

----------------------------------------------------------------------
 .../transaction/AttributeSetSnapshotFilter.java |  2 +-
 .../apache/omid/transaction/SnapshotFilter.java |  2 +-
 .../omid/transaction/SnapshotFilterImpl.java    |  9 ++--
 .../org/apache/omid/transaction/TTable.java     | 53 +++++++++++++-------
 .../omid/transaction/TestShadowCells.java       |  2 +-
 .../hbase/regionserver/OmidRegionScanner.java   |  9 ++--
 .../omid/transaction/OmidSnapshotFilter.java    |  4 +-
 7 files changed, 51 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9c48cfde/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java b/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java
index 6906939..bf2adf5 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java
@@ -65,7 +65,7 @@ public class AttributeSetSnapshotFilter implements SnapshotFilter {
 
     @Override
     public List<Cell> filterCellsForSnapshot(List<Cell> rawCells, HBaseTransaction transaction,
-                                      int versionsToRequest, Map<String, List<Cell>> familyDeletionCache) throws IOException {
+                                      int versionsToRequest, Map<String, List<Cell>> familyDeletionCache, Map<String,byte[]> attributeMap) throws IOException {
         throw new UnsupportedOperationException();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9c48cfde/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java
index 112544f..3027d89 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java
@@ -37,7 +37,7 @@ public interface SnapshotFilter {
     public ResultScanner getScanner(TTable ttable, Scan scan, HBaseTransaction transaction) throws IOException;
 
     public List<Cell> filterCellsForSnapshot(List<Cell> rawCells, HBaseTransaction transaction,
-            int versionsToRequest, Map<String, List<Cell>> familyDeletionCache) throws IOException;
+            int versionsToRequest, Map<String, List<Cell>> familyDeletionCache, Map<String,byte[]> attributeMap) throws IOException;
 
     public boolean isCommitted(HBaseCellId hBaseCellId, long epoch) throws TransactionException;
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9c48cfde/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
index a656aec..6729140 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
@@ -362,7 +362,7 @@ public class SnapshotFilterImpl implements SnapshotFilter {
      */
     @Override
     public List<Cell> filterCellsForSnapshot(List<Cell> rawCells, HBaseTransaction transaction,
-                                      int versionsToRequest, Map<String, List<Cell>> familyDeletionCache) throws IOException {
+                                      int versionsToRequest, Map<String, List<Cell>> familyDeletionCache, Map<String,byte[]> attributeMap) throws IOException {
 
         assert (rawCells != null && transaction != null && versionsToRequest >= 1);
 
@@ -411,6 +411,9 @@ public class SnapshotFilterImpl implements SnapshotFilter {
             if (!snapshotValueFound) {
                 assert (oldestCell != null);
                 Get pendingGet = createPendingGet(oldestCell, numberOfVersionsToFetch);
+                for (Map.Entry<String,byte[]> entry : attributeMap.entrySet()) {
+                    pendingGet.setAttribute(entry.getKey(), entry.getValue());
+                }
                 pendingGetsList.add(pendingGet);
             }
         }
@@ -420,7 +423,7 @@ public class SnapshotFilterImpl implements SnapshotFilter {
             for (Result pendingGetResult : pendingGetsResults) {
                 if (!pendingGetResult.isEmpty()) {
                     keyValuesInSnapshot.addAll(
-                        filterCellsForSnapshot(pendingGetResult.listCells(), transaction, numberOfVersionsToFetch, familyDeletionCache));
+                        filterCellsForSnapshot(pendingGetResult.listCells(), transaction, numberOfVersionsToFetch, familyDeletionCache, attributeMap));
                 }
             }
         }
@@ -436,7 +439,7 @@ public class SnapshotFilterImpl implements SnapshotFilter {
 
         List<Cell> filteredKeyValues = Collections.emptyList();
         if (!result.isEmpty()) {
-            filteredKeyValues = ttable.filterCellsForSnapshot(result.listCells(), transaction, get.getMaxVersions(), new HashMap<String, List<Cell>>());
+            filteredKeyValues = ttable.filterCellsForSnapshot(result.listCells(), transaction, get.getMaxVersions(), new HashMap<String, List<Cell>>(), get.getAttributesMap());
         }
 
         return Result.create(filteredKeyValues);

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9c48cfde/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
index 12dfb71..53a9857 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
@@ -17,8 +17,18 @@
  */
 package org.apache.omid.transaction;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -33,6 +43,7 @@ import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.OperationWithAttributes;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -41,23 +52,12 @@ import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.committable.CommitTable.CommitTimestamp;
-import org.apache.omid.proto.TSOProto;
 import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.NavigableSet;
-import java.util.Set;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 
 /**
  * Provides transactional methods for accessing and modifying a given snapshot of data identified by an opaque {@link
@@ -162,6 +162,7 @@ public class TTable implements Closeable {
 
         final long readTimestamp = transaction.getReadTimestamp();
         final Get tsget = new Get(get.getRow()).setFilter(get.getFilter());
+        propagateAttributes(get, tsget);
         TimeRange timeRange = get.getTimeRange();
         long startTime = timeRange.getMin();
         long endTime = Math.min(timeRange.getMax(), readTimestamp + 1);
@@ -186,6 +187,14 @@ public class TTable implements Closeable {
         return snapshotFilter.get(this, tsget, transaction);
     }
 
+    private void propagateAttributes(OperationWithAttributes from, OperationWithAttributes to) {
+        Map<String,byte[]> attributeMap = from.getAttributesMap();
+
+        for (Map.Entry<String,byte[]> entry : attributeMap.entrySet()) {
+            to.setAttribute(entry.getKey(), entry.getValue());
+        }
+    }
+
     private void familyQualifierBasedDeletion(HBaseTransaction tx, Put deleteP, Get deleteG) throws IOException {
         Result result = this.get(tx, deleteG);
         if (!result.isEmpty()) {
@@ -232,6 +241,8 @@ public class TTable implements Closeable {
 
         final Put deleteP = new Put(delete.getRow(), writeTimestamp);
         final Get deleteG = new Get(delete.getRow());
+        propagateAttributes(delete, deleteP);
+        propagateAttributes(delete, deleteG);
         Map<byte[], List<Cell>> fmap = delete.getFamilyCellMap();
         if (fmap.isEmpty()) {
             familyQualifierBasedDeletion(transaction, deleteP, deleteG);
@@ -310,6 +321,7 @@ public class TTable implements Closeable {
 
         // create put with correct ts
         final Put tsput = new Put(put.getRow(), writeTimestamp);
+        propagateAttributes(put, tsput);
         Map<byte[], List<Cell>> kvs = put.getFamilyCellMap();
         for (List<Cell> kvl : kvs.values()) {
             for (Cell c : kvl) {
@@ -350,6 +362,7 @@ public class TTable implements Closeable {
         Scan tsscan = new Scan(scan);
         tsscan.setMaxVersions(1);
         tsscan.setTimeRange(0, transaction.getReadTimestamp() + 1);
+        propagateAttributes(scan, tsscan);
         Map<byte[], NavigableSet<byte[]>> kvs = scan.getFamilyMap();
         for (Map.Entry<byte[], NavigableSet<byte[]>> entry : kvs.entrySet()) {
             byte[] family = entry.getKey();
@@ -368,10 +381,10 @@ public class TTable implements Closeable {
         return snapshotFilter.getScanner(this, tsscan, transaction);
     }
 
-    
+
     List<Cell> filterCellsForSnapshot(List<Cell> rawCells, HBaseTransaction transaction,
-                                      int versionsToRequest, Map<String, List<Cell>> familyDeletionCache) throws IOException {
-        return snapshotFilter.filterCellsForSnapshot(rawCells, transaction, versionsToRequest, familyDeletionCache);
+                                      int versionsToRequest, Map<String, List<Cell>> familyDeletionCache, Map<String,byte[]> attributeMap) throws IOException {
+        return snapshotFilter.filterCellsForSnapshot(rawCells, transaction, versionsToRequest, familyDeletionCache, attributeMap);
     }
 
 
@@ -381,6 +394,7 @@ public class TTable implements Closeable {
         private ResultScanner innerScanner;
         private int maxVersions;
         Map<String, List<Cell>> familyDeletionCache;
+        private Map<String,byte[]> attributeMap;
 
         TransactionalClientScanner(HBaseTransaction state, Scan scan, int maxVersions)
             throws IOException {
@@ -388,6 +402,7 @@ public class TTable implements Closeable {
             this.innerScanner = table.getScanner(scan);
             this.maxVersions = maxVersions;
             this.familyDeletionCache = new HashMap<String, List<Cell>>();
+            this.attributeMap = scan.getAttributesMap();
         }
 
 
@@ -400,7 +415,7 @@ public class TTable implements Closeable {
                     return null;
                 }
                 if (!result.isEmpty()) {
-                    filteredResult = filterCellsForSnapshot(result.listCells(), state, maxVersions, familyDeletionCache);
+                    filteredResult = filterCellsForSnapshot(result.listCells(), state, maxVersions, familyDeletionCache, attributeMap);
                 }
             }
             return Result.create(filteredResult);

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9c48cfde/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
index 8a35f9a..75b8ee9 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
@@ -346,7 +346,7 @@ public class TestShadowCells extends OmidTestBase {
                             return (List<KeyValue>) invocation.callRealMethod();
                         }
                     }).when(table).filterCellsForSnapshot(Matchers.<List<Cell>>any(),
-                            any(HBaseTransaction.class), anyInt(), Matchers.<Map<String, List<Cell>>>any());
+                            any(HBaseTransaction.class), anyInt(), Matchers.<Map<String, List<Cell>>>any(), Matchers.<Map<String,byte[]>>any());
 
                     TransactionManager tm = newTransactionManager(context);
                     if (hasShadowCell(row,

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9c48cfde/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/OmidRegionScanner.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/OmidRegionScanner.java b/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/OmidRegionScanner.java
index 752e9a7..6c95f65 100644
--- a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/OmidRegionScanner.java
+++ b/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/OmidRegionScanner.java
@@ -37,16 +37,19 @@ public class OmidRegionScanner implements RegionScanner {
     private HBaseTransaction transaction;
     private int maxVersions;
     private Map<String, List<Cell>> familyDeletionCache;
-    
+    private Map<String,byte[]> attributeMap;
+
     public OmidRegionScanner(SnapshotFilterImpl snapshotFilter,
                       RegionScanner s,
                       HBaseTransaction transaction,
-                      int maxVersions) {
+                      int maxVersions,
+                      Map<String,byte[]> attributeMap) {
         this.snapshotFilter = snapshotFilter;
         this.scanner = s;
         this.transaction = transaction;
         this.maxVersions = maxVersions;
         this.familyDeletionCache = new HashMap<String, List<Cell>>();
+        this.attributeMap = attributeMap;
     }
 
     @Override
@@ -115,7 +118,7 @@ public class OmidRegionScanner implements RegionScanner {
                 return false;
             }
 
-            filteredResult = snapshotFilter.filterCellsForSnapshot(filteredResult, transaction, maxVersions, familyDeletionCache);
+            filteredResult = snapshotFilter.filterCellsForSnapshot(filteredResult, transaction, maxVersions, familyDeletionCache, attributeMap);
         }
 
         for (Cell cell : filteredResult) {

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9c48cfde/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
index daf319c..a621a2f 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
@@ -112,7 +112,7 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
             VisibilityLevel visibilityLevel = VisibilityLevel.fromInteger(transaction.getVisibilityLevel());
 
             HBaseTransaction hbaseTransaction = new HBaseTransaction(id, readTs, visibilityLevel, epoch, new HashSet<HBaseCellId>(), null);
-            filteredKeyValues = snapshotFilter.filterCellsForSnapshot(res.listCells(), hbaseTransaction, get.getMaxVersions(), new HashMap<String, List<Cell>>());
+            filteredKeyValues = snapshotFilter.filterCellsForSnapshot(res.listCells(), hbaseTransaction, get.getMaxVersions(), new HashMap<String, List<Cell>>(), get.getAttributesMap());
         }
 
         for (Cell cell : filteredKeyValues) {
@@ -146,7 +146,7 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
 
         snapshotFilter.setTableAccessWrapper(regionAccessWrapper);
 
-        return new OmidRegionScanner(snapshotFilter, s, hbaseTransaction, 1);
+        return new OmidRegionScanner(snapshotFilter, s, hbaseTransaction, 1, scan.getAttributesMap());
     }
 
     private CommitTable.Client initAndGetCommitTableClient() throws IOException {