You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2014/04/24 07:46:29 UTC

svn commit: r1589594 - in /hbase/branches/0.98/hbase-server/src: main/java/org/apache/hadoop/hbase/security/access/ test/java/org/apache/hadoop/hbase/security/access/

Author: anoopsamjohn
Date: Thu Apr 24 05:46:29 2014
New Revision: 1589594

URL: http://svn.apache.org/r1589594
Log:
HBASE-10970 [AccessController] Issues with covering cell permission checks. (Anoop)

Modified:
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
    hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
    hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestCellACLWithMultipleVersions.java

Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java?rev=1589594&r1=1589593&r2=1589594&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java Thu Apr 24 05:46:29 2014
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
@@ -476,9 +477,31 @@ public class AccessController extends Ba
     }
   }
 
-  private void requireCoveringPermission(String request, RegionCoprocessorEnvironment e,
-      byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs,
-      boolean allVersions, Action...actions) throws IOException {
+  private enum OpType {
+    GET_CLOSEST_ROW_BEFORE("getClosestRowBefore"),
+    PUT("put"),
+    DELETE("delete"),
+    CHECK_AND_PUT("checkAndPut"),
+    CHECK_AND_DELETE("checkAndDelete"),
+    INCREMENT_COLUMN_VALUE("incrementColumnValue"),
+    APPEND("append"),
+    INCREMENT("increment");
+
+    private String type;
+
+    private OpType(String type) {
+      this.type = type;
+    }
+
+    @Override
+    public String toString() {
+      return type;
+    }
+  }
+
+  private void requireCoveringPermission(OpType request, RegionCoprocessorEnvironment e,
+      byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
+      throws IOException {
     User user = getActiveUser();
 
     // First check table or CF level permissions, if they grant access we can
@@ -490,7 +513,7 @@ public class AccessController extends Ba
     // HBASE-7123.
     AuthResult results[] = new AuthResult[actions.length];
     for (int i = 0; i < actions.length; i++) {
-      results[i] = permissionGranted(request, user, actions[i], e, familyMap);
+      results[i] = permissionGranted(request.type, user, actions[i], e, familyMap);
       if (!results[i].isAllowed()) {
         if (LOG.isTraceEnabled()) {
           LOG.trace("Got " + results[i] + ", added to cellCheckActions");
@@ -512,15 +535,23 @@ public class AccessController extends Ba
     // Table or CF permissions do not allow, enumerate the covered KVs. We
     // can stop at the first which does not grant access.
     int cellsChecked = 0;
-    opTs = opTs != HConstants.LATEST_TIMESTAMP ? opTs : 0;
     long latestCellTs = 0;
     if (canPersistCellACLs) {
       Get get = new Get(row);
-      if (allVersions) {
+      // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
+      // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
+      // version. We have to get every cell version and check its TS against the TS asked for in
+      // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
+      // consider only one such passing cell. In case of Delete we have to consider all the cell
+      // versions under this passing version. When Delete Mutation contains columns which are a
+      // version delete just consider only one version for those column cells.
+      boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
+      if (considerCellTs) {
         get.setMaxVersions();
       } else {
         get.setMaxVersions(1);
       }
+      boolean diffCellTsFromOpTs = false;
       for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
         byte[] col = entry.getKey();
         // TODO: HBASE-7114 could possibly unify the collection type in family
@@ -548,8 +579,10 @@ public class AccessController extends Ba
               } else {
                 get.addColumn(col, CellUtil.cloneQualifier(cell));
               }
-              if (cell.getTimestamp() != HConstants.LATEST_TIMESTAMP) {
-                latestCellTs = Math.max(latestCellTs, cell.getTimestamp());
+              if (considerCellTs) {
+                long cellTs = cell.getTimestamp();
+                latestCellTs = Math.max(latestCellTs, cellTs);
+                diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
               }
             }
           }
@@ -565,15 +598,35 @@ public class AccessController extends Ba
       // the upper bound of a timerange is exclusive yet we need to examine
       // any cells found there inclusively.
       long latestTs = Math.max(opTs, latestCellTs);
-      if (latestTs == 0) {
+      if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
         latestTs = EnvironmentEdgeManager.currentTimeMillis();
       }
       get.setTimeRange(0, latestTs + 1);
+      // In case of Put operation we set to read all versions. This was done to consider the case
+      // where columns are added with TS other than the Mutation TS. But normally this wont be the
+      // case with Put. There no need to get all versions but get latest version only.
+      if (!diffCellTsFromOpTs && request == OpType.PUT) {
+        get.setMaxVersions(1);
+      }
       if (LOG.isTraceEnabled()) {
         LOG.trace("Scanning for cells with " + get);
       }
+      // This Map is identical to familyMap. The key is a BR rather than byte[].
+      // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
+      // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
+      Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
+      for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
+        if (entry.getValue() instanceof List) {
+          familyMap1.put(new SimpleByteRange(entry.getKey()), (List<Cell>) entry.getValue());
+        }
+      }
       RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
       List<Cell> cells = Lists.newArrayList();
+      Cell prevCell = null;
+      ByteRange curFam = new SimpleByteRange();
+      boolean curColAllVersions = (request == OpType.DELETE);
+      long curColCheckTs = opTs;
+      boolean foundColumn = false;
       try {
         boolean more = false;
         do {
@@ -584,10 +637,43 @@ public class AccessController extends Ba
             if (LOG.isTraceEnabled()) {
               LOG.trace("Found cell " + cell);
             }
+            boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
+            if (colChange) foundColumn = false;
+            prevCell = cell;
+            if (!curColAllVersions && foundColumn) {
+              continue;
+            }
+            if (colChange && considerCellTs) {
+              curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
+              List<Cell> cols = familyMap1.get(curFam);
+              for (Cell col : cols) {
+                // null/empty qualifier is used to denote a Family delete. The TS and delete type
+                // associated with this is applicable for all columns within the family. That is
+                // why the below (col.getQualifierLength() == 0) check.
+                if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
+                    || CellUtil.matchingQualifier(cell, col)) {
+                  byte type = col.getTypeByte();
+                  if (considerCellTs)
+                    curColCheckTs = col.getTimestamp();
+                  // For a Delete op we pass allVersions as true. When a Delete Mutation contains
+                  // a version delete for a column no need to check all the covering cells within
+                  // that column. Check all versions when Type is DeleteColumn or DeleteFamily
+                  // One version delete types are Delete/DeleteFamilyVersion
+                  curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
+                      || (KeyValue.Type.DeleteFamily.getCode() == type);
+                  break;
+                }
+              }
+            }
+            if (cell.getTimestamp() > curColCheckTs) {
+              // Just ignore this cell. This is not a covering cell.
+              continue;
+            }
+            foundColumn = true;
             for (Action action: cellCheckActions) {
               // Are there permissions for this user for the cell?
               if (!authManager.authorize(user, getTableName(e), cell, false, action)) {
-                AuthResult authResult = AuthResult.deny(request, "Insufficient permissions",
+                AuthResult authResult = AuthResult.deny(request.type, "Insufficient permissions",
                   user, action, getTableName(e), CellUtil.cloneFamily(cell),
                   CellUtil.cloneQualifier(cell));
                 logResult(authResult);
@@ -612,7 +698,7 @@ public class AccessController extends Ba
       if (LOG.isTraceEnabled()) {
         LOG.trace("No cells found with scan");
       }
-      AuthResult authResult = AuthResult.deny(request, "Insufficient permissions",
+      AuthResult authResult = AuthResult.deny(request.type, "Insufficient permissions",
         user, cellCheckActions.get(0), getTableName(e), familyMap);
       logResult(authResult);
       throw new AccessDeniedException("Insufficient permissions " +
@@ -623,7 +709,7 @@ public class AccessController extends Ba
     // thousands of fine grained decisions with providing detail.
     for (byte[] family: familyMap.keySet()) {
       for (Action action: actions) {
-        logResult(AuthResult.allow(request, "Permission granted", user, action,
+        logResult(AuthResult.allow(request.type, "Permission granted", user, action,
           getTableName(e), family, null));
       }
     }
@@ -1167,8 +1253,8 @@ public class AccessController extends Ba
       final byte [] row, final byte [] family, final Result result)
       throws IOException {
     assert family != null;
-    requireCoveringPermission("getClosestRowBefore", c.getEnvironment(), row,
-      makeFamilyMap(family, null), HConstants.LATEST_TIMESTAMP, false, Permission.Action.READ);
+    requireCoveringPermission(OpType.GET_CLOSEST_ROW_BEFORE, c.getEnvironment(), row,
+        makeFamilyMap(family, null), HConstants.LATEST_TIMESTAMP, Permission.Action.READ);
   }
 
   @Override
@@ -1194,8 +1280,8 @@ public class AccessController extends Ba
     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
     // change the ACL of any previous Put. This allows simple evolution of
     // security policy over time without requiring expensive updates.
-    requireCoveringPermission("put", c.getEnvironment(), put.getRow(),
-      put.getFamilyCellMap(), put.getTimeStamp(), false, Permission.Action.WRITE);
+    requireCoveringPermission(OpType.PUT, c.getEnvironment(), put.getRow(),
+      put.getFamilyCellMap(), put.getTimeStamp(), Permission.Action.WRITE);
     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
     if (bytes != null) {
       if (canPersistCellACLs) {
@@ -1227,8 +1313,8 @@ public class AccessController extends Ba
     // compaction could remove them. If the user doesn't have permission to
     // overwrite any of the visible versions ('visible' defined as not covered
     // by a tombstone already) then we have to disallow this operation.
-    requireCoveringPermission("delete", c.getEnvironment(), delete.getRow(),
-      delete.getFamilyCellMap(), delete.getTimeStamp(), true, Action.WRITE);
+    requireCoveringPermission(OpType.DELETE, c.getEnvironment(), delete.getRow(),
+      delete.getFamilyCellMap(), delete.getTimeStamp(), Action.WRITE);
   }
 
   @Override
@@ -1247,9 +1333,8 @@ public class AccessController extends Ba
       final ByteArrayComparable comparator, final Put put,
       final boolean result) throws IOException {
     // Require READ and WRITE permissions on the table, CF, and KV to update
-    requireCoveringPermission("checkAndPut", c.getEnvironment(), row,
-      makeFamilyMap(family, qualifier), HConstants.LATEST_TIMESTAMP, false,
-      Action.READ, Action.WRITE);
+    requireCoveringPermission(OpType.CHECK_AND_PUT, c.getEnvironment(), row,
+        makeFamilyMap(family, qualifier), HConstants.LATEST_TIMESTAMP, Action.READ, Action.WRITE);
     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
     if (bytes != null) {
       if (canPersistCellACLs) {
@@ -1274,9 +1359,8 @@ public class AccessController extends Ba
     }
     // Require READ and WRITE permissions on the table, CF, and the KV covered
     // by the delete
-    requireCoveringPermission("checkAndDelete", c.getEnvironment(), row,
-      makeFamilyMap(family, qualifier), HConstants.LATEST_TIMESTAMP, false,
-      Action.READ, Action.WRITE);
+    requireCoveringPermission(OpType.CHECK_AND_DELETE, c.getEnvironment(), row,
+        makeFamilyMap(family, qualifier), HConstants.LATEST_TIMESTAMP, Action.READ, Action.WRITE);
     return result;
   }
 
@@ -1287,9 +1371,8 @@ public class AccessController extends Ba
       throws IOException {
     // Require WRITE permission to the table, CF, and the KV to be replaced by the
     // incremented value
-    requireCoveringPermission("incrementColumnValue", c.getEnvironment(), row,
-      makeFamilyMap(family, qualifier), HConstants.LATEST_TIMESTAMP, false,
-      Action.WRITE);
+    requireCoveringPermission(OpType.INCREMENT_COLUMN_VALUE, c.getEnvironment(), row,
+        makeFamilyMap(family, qualifier), HConstants.LATEST_TIMESTAMP, Action.WRITE);
     return -1;
   }
 
@@ -1297,9 +1380,8 @@ public class AccessController extends Ba
   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
       throws IOException {
     // Require WRITE permission to the table, CF, and the KV to be appended
-    requireCoveringPermission("append", c.getEnvironment(), append.getRow(),
-      append.getFamilyCellMap(), append.getTimeStamp(), false,
-      Action.WRITE);
+    requireCoveringPermission(OpType.APPEND, c.getEnvironment(), append.getRow(),
+        append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE);
     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
     if (bytes != null) {
       if (canPersistCellACLs) {
@@ -1317,9 +1399,8 @@ public class AccessController extends Ba
       throws IOException {
     // Require WRITE permission to the table, CF, and the KV to be replaced by
     // the incremented value
-    requireCoveringPermission("increment", c.getEnvironment(), increment.getRow(),
-      increment.getFamilyCellMap(), increment.getTimeRange().getMax(), false,
-      Action.WRITE);
+    requireCoveringPermission(OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
+        increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE);
     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
     if (bytes != null) {
       if (canPersistCellACLs) {

Modified: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java?rev=1589594&r1=1589593&r2=1589594&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java (original)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java Thu Apr 24 05:46:29 2014
@@ -169,7 +169,7 @@ public class SecureTestUtil {
         if (results != null && results.isEmpty()) {
           fail("Empty non null results from action for user '" + user.getShortName() + "'");
         }
-        assertEquals(results.size(), count);
+        assertEquals(count, results.size());
       }
     } catch (AccessDeniedException ade) {
       fail("Expected action to pass for user '" + user.getShortName() + "' but was denied");

Modified: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestCellACLWithMultipleVersions.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestCellACLWithMultipleVersions.java?rev=1589594&r1=1589593&r2=1589594&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestCellACLWithMultipleVersions.java (original)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestCellACLWithMultipleVersions.java Thu Apr 24 05:46:29 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.De
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
@@ -69,7 +70,8 @@ public class TestCellACLWithMultipleVers
   @Rule
   public TestTableName TEST_TABLE = new TestTableName();
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-  private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
+  private static final byte[] TEST_FAMILY1 = Bytes.toBytes("f1");
+  private static final byte[] TEST_FAMILY2 = Bytes.toBytes("f2");
   private static final byte[] TEST_ROW = Bytes.toBytes("cellpermtest");
   private static final byte[] TEST_Q1 = Bytes.toBytes("q1");
   private static final byte[] TEST_Q2 = Bytes.toBytes("q2");
@@ -130,7 +132,11 @@ public class TestCellACLWithMultipleVers
     // Create the test table (owner added to the _acl_ table)
     HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
     HTableDescriptor htd = new HTableDescriptor(TEST_TABLE.getTableName());
-    HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAMILY);
+    HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAMILY1);
+    hcd.setMaxVersions(4);
+    htd.setOwner(USER_OWNER);
+    htd.addFamily(hcd);
+    hcd = new HColumnDescriptor(TEST_FAMILY2);
     hcd.setMaxVersions(4);
     htd.setOwner(USER_OWNER);
     htd.addFamily(hcd);
@@ -149,20 +155,20 @@ public class TestCellACLWithMultipleVers
         try {
           Put p;
           // with ro ACL
-          p = new Put(TEST_ROW).add(TEST_FAMILY, TEST_Q1, ZERO);
+          p = new Put(TEST_ROW).add(TEST_FAMILY1, TEST_Q1, ZERO);
           p.setACL(USER_OTHER.getShortName(), new Permission(Permission.Action.WRITE));
           t.put(p);
           // with ro ACL
-          p = new Put(TEST_ROW).add(TEST_FAMILY, TEST_Q1, ZERO);
+          p = new Put(TEST_ROW).add(TEST_FAMILY1, TEST_Q1, ZERO);
           p.setACL(USER_OTHER.getShortName(), new Permission(Permission.Action.READ));
           t.put(p);
-          p = new Put(TEST_ROW).add(TEST_FAMILY, TEST_Q1, ZERO);
+          p = new Put(TEST_ROW).add(TEST_FAMILY1, TEST_Q1, ZERO);
           p.setACL(USER_OTHER.getShortName(), new Permission(Permission.Action.WRITE));
           t.put(p);
-          p = new Put(TEST_ROW).add(TEST_FAMILY, TEST_Q1, ZERO);
+          p = new Put(TEST_ROW).add(TEST_FAMILY1, TEST_Q1, ZERO);
           p.setACL(USER_OTHER.getShortName(), new Permission(Permission.Action.READ));
           t.put(p);
-          p = new Put(TEST_ROW).add(TEST_FAMILY, TEST_Q1, ZERO);
+          p = new Put(TEST_ROW).add(TEST_FAMILY1, TEST_Q1, ZERO);
           p.setACL(USER_OTHER.getShortName(), new Permission(Permission.Action.WRITE));
           t.put(p);
         } finally {
@@ -213,13 +219,13 @@ public class TestCellACLWithMultipleVers
         HTable t = new HTable(conf, TEST_TABLE.getTableName());
         try {
           Put p;
-          p = new Put(TEST_ROW).add(TEST_FAMILY, TEST_Q1, ZERO);
+          p = new Put(TEST_ROW).add(TEST_FAMILY1, TEST_Q1, ZERO);
           p.setACL(USER_OTHER.getShortName(), new Permission(Permission.Action.WRITE));
           t.put(p);
-          p = new Put(TEST_ROW).add(TEST_FAMILY, TEST_Q1, ZERO);
+          p = new Put(TEST_ROW).add(TEST_FAMILY1, TEST_Q1, ZERO);
           p.setACL(USER_OTHER.getShortName(), new Permission(Permission.Action.READ));
           t.put(p);
-          p = new Put(TEST_ROW).add(TEST_FAMILY, TEST_Q1, ZERO);
+          p = new Put(TEST_ROW).add(TEST_FAMILY1, TEST_Q1, ZERO);
           p.setACL(USER_OTHER.getShortName(), new Permission(Permission.Action.WRITE));
           t.put(p);
         } finally {
@@ -253,15 +259,15 @@ public class TestCellACLWithMultipleVers
         try {
           // with rw ACL for "user1"
           Put p = new Put(TEST_ROW1);
-          p.add(TEST_FAMILY, TEST_Q1, ZERO);
-          p.add(TEST_FAMILY, TEST_Q2, ZERO);
+          p.add(TEST_FAMILY1, TEST_Q1, ZERO);
+          p.add(TEST_FAMILY1, TEST_Q2, ZERO);
           p.setACL(user1.getShortName(), new Permission(Permission.Action.READ,
               Permission.Action.WRITE));
           t.put(p);
           // with rw ACL for "user1"
           p = new Put(TEST_ROW2);
-          p.add(TEST_FAMILY, TEST_Q1, ZERO);
-          p.add(TEST_FAMILY, TEST_Q2, ZERO);
+          p.add(TEST_FAMILY1, TEST_Q1, ZERO);
+          p.add(TEST_FAMILY1, TEST_Q2, ZERO);
           p.setACL(user1.getShortName(), new Permission(Permission.Action.READ,
               Permission.Action.WRITE));
           t.put(p);
@@ -279,8 +285,8 @@ public class TestCellACLWithMultipleVers
         try {
           // with rw ACL for "user1" and "user2"
           Put p = new Put(TEST_ROW1);
-          p.add(TEST_FAMILY, TEST_Q1, ZERO);
-          p.add(TEST_FAMILY, TEST_Q2, ZERO);
+          p.add(TEST_FAMILY1, TEST_Q1, ZERO);
+          p.add(TEST_FAMILY1, TEST_Q2, ZERO);
           Map<String, Permission> perms = new HashMap<String, Permission>();
           perms.put(user1.getShortName(), new Permission(Permission.Action.READ,
               Permission.Action.WRITE));
@@ -290,8 +296,8 @@ public class TestCellACLWithMultipleVers
           t.put(p);
           // with rw ACL for "user1" and "user2"
           p = new Put(TEST_ROW2);
-          p.add(TEST_FAMILY, TEST_Q1, ZERO);
-          p.add(TEST_FAMILY, TEST_Q2, ZERO);
+          p.add(TEST_FAMILY1, TEST_Q1, ZERO);
+          p.add(TEST_FAMILY1, TEST_Q2, ZERO);
           p.setACL(perms);
           t.put(p);
         } finally {
@@ -309,8 +315,8 @@ public class TestCellACLWithMultipleVers
         HTable t = new HTable(conf, TEST_TABLE.getTableName());
         try {
           Delete d = new Delete(TEST_ROW1);
-          d.deleteColumns(TEST_FAMILY, TEST_Q1);
-          d.deleteColumns(TEST_FAMILY, TEST_Q2);
+          d.deleteColumns(TEST_FAMILY1, TEST_Q1);
+          d.deleteColumns(TEST_FAMILY1, TEST_Q2);
           t.delete(d);
         } finally {
           t.close();
@@ -326,8 +332,8 @@ public class TestCellACLWithMultipleVers
         HTable t = new HTable(conf, TEST_TABLE.getTableName());
         try {
           Delete d = new Delete(TEST_ROW2);
-          d.deleteColumns(TEST_FAMILY, TEST_Q1);
-          d.deleteColumns(TEST_FAMILY, TEST_Q2);
+          d.deleteColumns(TEST_FAMILY1, TEST_Q1);
+          d.deleteColumns(TEST_FAMILY1, TEST_Q2);
           t.delete(d);
           fail("user2 should not be allowed to delete the row");
         } catch (Exception e) {
@@ -345,7 +351,7 @@ public class TestCellACLWithMultipleVers
         HTable t = new HTable(conf, TEST_TABLE.getTableName());
         try {
           Delete d = new Delete(TEST_ROW2);
-          d.deleteFamily(TEST_FAMILY);
+          d.deleteFamily(TEST_FAMILY1);
           t.delete(d);
         } finally {
           t.close();
@@ -366,13 +372,13 @@ public class TestCellACLWithMultipleVers
         HTable t = new HTable(conf, TEST_TABLE.getTableName());
         try {
           // Store read only ACL at a future time
-          Put p = new Put(TEST_ROW).add(TEST_FAMILY, TEST_Q1,
+          Put p = new Put(TEST_ROW).add(TEST_FAMILY1, TEST_Q1,
             EnvironmentEdgeManager.currentTimeMillis() + 1000000,
             ZERO);
           p.setACL(USER_OTHER.getShortName(), new Permission(Permission.Action.READ));
           t.put(p);
           // Store a read write ACL without a timestamp, server will use current time
-          p = new Put(TEST_ROW).add(TEST_FAMILY, TEST_Q2, ONE);
+          p = new Put(TEST_ROW).add(TEST_FAMILY1, TEST_Q2, ONE);
           p.setACL(USER_OTHER.getShortName(), new Permission(Permission.Action.READ,
             Permission.Action.WRITE));
           t.put(p);
@@ -388,7 +394,7 @@ public class TestCellACLWithMultipleVers
     AccessTestAction getQ1 = new AccessTestAction() {
       @Override
       public Object run() throws Exception {
-        Get get = new Get(TEST_ROW).addColumn(TEST_FAMILY, TEST_Q1);
+        Get get = new Get(TEST_ROW).addColumn(TEST_FAMILY1, TEST_Q1);
         HTable t = new HTable(conf, TEST_TABLE.getTableName());
         try {
           return t.get(get).listCells();
@@ -401,7 +407,7 @@ public class TestCellACLWithMultipleVers
     AccessTestAction getQ2 = new AccessTestAction() {
       @Override
       public Object run() throws Exception {
-        Get get = new Get(TEST_ROW).addColumn(TEST_FAMILY, TEST_Q2);
+        Get get = new Get(TEST_ROW).addColumn(TEST_FAMILY1, TEST_Q2);
         HTable t = new HTable(conf, TEST_TABLE.getTableName());
         try {
           return t.get(get).listCells();
@@ -421,7 +427,7 @@ public class TestCellACLWithMultipleVers
     AccessTestAction deleteFamily = new AccessTestAction() {
       @Override
       public Object run() throws Exception {
-        Delete delete = new Delete(TEST_ROW).deleteFamily(TEST_FAMILY);
+        Delete delete = new Delete(TEST_ROW).deleteFamily(TEST_FAMILY1);
         HTable t = new HTable(conf, TEST_TABLE.getTableName());
         try {
           t.delete(delete);
@@ -452,8 +458,8 @@ public class TestCellACLWithMultipleVers
         try {
           // This version (TS = 123) with rw ACL for USER_OTHER and USER_OTHER2
           Put p = new Put(TEST_ROW);
-          p.add(TEST_FAMILY, TEST_Q1, 123L, ZERO);
-          p.add(TEST_FAMILY, TEST_Q2, 123L, ZERO);
+          p.add(TEST_FAMILY1, TEST_Q1, 123L, ZERO);
+          p.add(TEST_FAMILY1, TEST_Q2, 123L, ZERO);
           Map<String, Permission> perms = new HashMap<String, Permission>();
           perms.put(USER_OTHER.getShortName(), new Permission(Permission.Action.READ,
             Permission.Action.WRITE));
@@ -464,8 +470,8 @@ public class TestCellACLWithMultipleVers
 
           // This version (TS = 125) with rw ACL for USER_OTHER
           p = new Put(TEST_ROW);
-          p.add(TEST_FAMILY, TEST_Q1, 125L, ONE);
-          p.add(TEST_FAMILY, TEST_Q2, 125L, ONE);
+          p.add(TEST_FAMILY1, TEST_Q1, 125L, ONE);
+          p.add(TEST_FAMILY1, TEST_Q2, 125L, ONE);
           perms = new HashMap<String, Permission>();
           perms.put(USER_OTHER.getShortName(), new Permission(Permission.Action.READ,
             Permission.Action.WRITE));
@@ -474,8 +480,8 @@ public class TestCellACLWithMultipleVers
 
           // This version (TS = 127) with rw ACL for USER_OTHER
           p = new Put(TEST_ROW);
-          p.add(TEST_FAMILY, TEST_Q1, 127L, TWO);
-          p.add(TEST_FAMILY, TEST_Q2, 127L, TWO);
+          p.add(TEST_FAMILY1, TEST_Q1, 127L, TWO);
+          p.add(TEST_FAMILY1, TEST_Q2, 127L, TWO);
           perms = new HashMap<String, Permission>();
           perms.put(USER_OTHER.getShortName(), new Permission(Permission.Action.READ,
             Permission.Action.WRITE));
@@ -496,7 +502,7 @@ public class TestCellACLWithMultipleVers
         HTable t = new HTable(conf, TEST_TABLE.getTableName());
         try {
           Delete d = new Delete(TEST_ROW, 124L);
-          d.deleteColumns(TEST_FAMILY, TEST_Q1);
+          d.deleteColumns(TEST_FAMILY1, TEST_Q1);
           t.delete(d);
         } finally {
           t.close();
@@ -512,8 +518,292 @@ public class TestCellACLWithMultipleVers
         HTable t = new HTable(conf, TEST_TABLE.getTableName());
         try {
           Delete d = new Delete(TEST_ROW);
-          d.deleteColumns(TEST_FAMILY, TEST_Q2, 124L);
+          d.deleteColumns(TEST_FAMILY1, TEST_Q2, 124L);
+          t.delete(d);
+        } finally {
+          t.close();
+        }
+        return null;
+      }
+    });
+  }
+
+  @Test
+  public void testCellPermissionsWithDeleteExactVersion() throws Exception {
+    final byte[] TEST_ROW1 = Bytes.toBytes("r1");
+    final byte[] TEST_Q1 = Bytes.toBytes("q1");
+    final byte[] TEST_Q2 = Bytes.toBytes("q2");
+    final byte[] ZERO = Bytes.toBytes(0L);
+
+    final User user1 = User.createUserForTesting(conf, "user1", new String[0]);
+    final User user2 = User.createUserForTesting(conf, "user2", new String[0]);
+
+    verifyAllowed(new AccessTestAction() {
+      @Override
+      public Object run() throws Exception {
+        HTable t = new HTable(conf, TEST_TABLE.getTableName());
+        try {
+          Map<String, Permission> permsU1andOwner = new HashMap<String, Permission>();
+          permsU1andOwner.put(user1.getShortName(), new Permission(Permission.Action.READ,
+              Permission.Action.WRITE));
+          permsU1andOwner.put(USER_OWNER.getShortName(), new Permission(Permission.Action.READ,
+              Permission.Action.WRITE));
+          Map<String, Permission> permsU2andOwner = new HashMap<String, Permission>();
+          permsU2andOwner.put(user2.getShortName(), new Permission(Permission.Action.READ,
+              Permission.Action.WRITE));
+          permsU2andOwner.put(USER_OWNER.getShortName(), new Permission(Permission.Action.READ,
+              Permission.Action.WRITE));
+          Put p = new Put(TEST_ROW1);
+          p.add(TEST_FAMILY1, TEST_Q1, 123, ZERO);
+          p.setACL(permsU1andOwner);
+          t.put(p);
+          p = new Put(TEST_ROW1);
+          p.add(TEST_FAMILY1, TEST_Q2, 123, ZERO);
+          p.setACL(permsU2andOwner);
+          t.put(p);
+          p = new Put(TEST_ROW1);
+          p.add(TEST_FAMILY2, TEST_Q1, 123, ZERO);
+          p.add(TEST_FAMILY2, TEST_Q2, 123, ZERO);
+          p.setACL(permsU2andOwner);
+          t.put(p);
+
+          p = new Put(TEST_ROW1);
+          p.add(TEST_FAMILY2, TEST_Q1, 125, ZERO);
+          p.add(TEST_FAMILY2, TEST_Q2, 125, ZERO);
+          p.setACL(permsU1andOwner);
+          t.put(p);
+
+          p = new Put(TEST_ROW1);
+          p.add(TEST_FAMILY1, TEST_Q1, 127, ZERO);
+          p.setACL(permsU2andOwner);
+          t.put(p);
+          p = new Put(TEST_ROW1);
+          p.add(TEST_FAMILY1, TEST_Q2, 127, ZERO);
+          p.setACL(permsU1andOwner);
+          t.put(p);
+          p = new Put(TEST_ROW1);
+          p.add(TEST_FAMILY2, TEST_Q1, 129, ZERO);
+          p.add(TEST_FAMILY2, TEST_Q2, 129, ZERO);
+          p.setACL(permsU1andOwner);
+          t.put(p);
+        } finally {
+          t.close();
+        }
+        return null;
+      }
+    }, USER_OWNER);
+
+    // user1 should be allowed to delete TEST_ROW1 as he is having write permission on both
+    // versions of the cells
+    user1.runAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        HTable t = new HTable(conf, TEST_TABLE.getTableName());
+        try {
+          Delete d = new Delete(TEST_ROW1);
+          d.deleteColumn(TEST_FAMILY1, TEST_Q1, 123);
+          d.deleteColumn(TEST_FAMILY1, TEST_Q2);
+          d.deleteFamilyVersion(TEST_FAMILY2, 125);
+          t.delete(d);
+        } finally {
+          t.close();
+        }
+        return null;
+      }
+    });
+
+    user2.runAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        HTable t = new HTable(conf, TEST_TABLE.getTableName());
+        try {
+          Delete d = new Delete(TEST_ROW1, 127);
+          d.deleteColumns(TEST_FAMILY1, TEST_Q1);
+          d.deleteColumns(TEST_FAMILY1, TEST_Q2);
+          d.deleteFamily(TEST_FAMILY2, 129);
           t.delete(d);
+          fail("user2 can not do the delete");
+        } catch (Exception e) {
+
+        } finally {
+          t.close();
+        }
+        return null;
+      }
+    });
+  }
+
+  @Test
+  public void testCellPermissionsForIncrementWithMultipleVersions() throws Exception {
+    final byte[] TEST_ROW1 = Bytes.toBytes("r1");
+    final byte[] TEST_Q1 = Bytes.toBytes("q1");
+    final byte[] TEST_Q2 = Bytes.toBytes("q2");
+    final byte[] ZERO = Bytes.toBytes(0L);
+
+    final User user1 = User.createUserForTesting(conf, "user1", new String[0]);
+    final User user2 = User.createUserForTesting(conf, "user2", new String[0]);
+
+    verifyAllowed(new AccessTestAction() {
+      @Override
+      public Object run() throws Exception {
+        HTable t = new HTable(conf, TEST_TABLE.getTableName());
+        try {
+          Map<String, Permission> permsU1andOwner = new HashMap<String, Permission>();
+          permsU1andOwner.put(user1.getShortName(), new Permission(Permission.Action.READ,
+              Permission.Action.WRITE));
+          permsU1andOwner.put(USER_OWNER.getShortName(), new Permission(Permission.Action.READ,
+              Permission.Action.WRITE));
+          Map<String, Permission> permsU2andOwner = new HashMap<String, Permission>();
+          permsU2andOwner.put(user2.getShortName(), new Permission(Permission.Action.READ,
+              Permission.Action.WRITE));
+          permsU2andOwner.put(USER_OWNER.getShortName(), new Permission(Permission.Action.READ,
+              Permission.Action.WRITE));
+          Put p = new Put(TEST_ROW1);
+          p.add(TEST_FAMILY1, TEST_Q1, 123, ZERO);
+          p.setACL(permsU1andOwner);
+          t.put(p);
+          p = new Put(TEST_ROW1);
+          p.add(TEST_FAMILY1, TEST_Q2, 123, ZERO);
+          p.setACL(permsU2andOwner);
+          t.put(p);
+
+          p = new Put(TEST_ROW1);
+          p.add(TEST_FAMILY1, TEST_Q1, 127, ZERO);
+          p.setACL(permsU2andOwner);
+          t.put(p);
+          p = new Put(TEST_ROW1);
+          p.add(TEST_FAMILY1, TEST_Q2, 127, ZERO);
+          p.setACL(permsU1andOwner);
+          t.put(p);
+        } finally {
+          t.close();
+        }
+        return null;
+      }
+    }, USER_OWNER);
+
+    // Increment considers the TimeRange set on it.
+    user1.runAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        HTable t = new HTable(conf, TEST_TABLE.getTableName());
+        try {
+          Increment inc = new Increment(TEST_ROW1);
+          inc.setTimeRange(0, 123);
+          inc.addColumn(TEST_FAMILY1, TEST_Q1, 2L);
+          t.increment(inc);
+          t.incrementColumnValue(TEST_ROW1, TEST_FAMILY1, TEST_Q2, 1L);
+        } finally {
+          t.close();
+        }
+        return null;
+      }
+    });
+
+    user2.runAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        HTable t = new HTable(conf, TEST_TABLE.getTableName());
+        try {
+          Increment inc = new Increment(TEST_ROW1);
+          inc.setTimeRange(0, 127);
+          inc.addColumn(TEST_FAMILY1, TEST_Q2, 2L);
+          t.increment(inc);
+          fail();
+        } catch (Exception e) {
+
+        } finally {
+          t.close();
+        }
+        return null;
+      }
+    });
+  }
+
+  @Test
+  public void testCellPermissionsForPutWithMultipleVersions() throws Exception {
+    final byte[] TEST_ROW1 = Bytes.toBytes("r1");
+    final byte[] TEST_Q1 = Bytes.toBytes("q1");
+    final byte[] TEST_Q2 = Bytes.toBytes("q2");
+    final byte[] ZERO = Bytes.toBytes(0L);
+
+    final User user1 = User.createUserForTesting(conf, "user1", new String[0]);
+    final User user2 = User.createUserForTesting(conf, "user2", new String[0]);
+
+    verifyAllowed(new AccessTestAction() {
+      @Override
+      public Object run() throws Exception {
+        HTable t = new HTable(conf, TEST_TABLE.getTableName());
+        try {
+          Map<String, Permission> permsU1andOwner = new HashMap<String, Permission>();
+          permsU1andOwner.put(user1.getShortName(), new Permission(Permission.Action.READ,
+              Permission.Action.WRITE));
+          permsU1andOwner.put(USER_OWNER.getShortName(), new Permission(Permission.Action.READ,
+              Permission.Action.WRITE));
+          Map<String, Permission> permsU2andOwner = new HashMap<String, Permission>();
+          permsU2andOwner.put(user2.getShortName(), new Permission(Permission.Action.READ,
+              Permission.Action.WRITE));
+          permsU2andOwner.put(USER_OWNER.getShortName(), new Permission(Permission.Action.READ,
+              Permission.Action.WRITE));
+          Put p = new Put(TEST_ROW1);
+          p.add(TEST_FAMILY1, TEST_Q1, 123, ZERO);
+          p.setACL(permsU1andOwner);
+          t.put(p);
+          p = new Put(TEST_ROW1);
+          p.add(TEST_FAMILY1, TEST_Q2, 123, ZERO);
+          p.setACL(permsU2andOwner);
+          t.put(p);
+
+          p = new Put(TEST_ROW1);
+          p.add(TEST_FAMILY1, TEST_Q1, 127, ZERO);
+          p.setACL(permsU2andOwner);
+          t.put(p);
+          p = new Put(TEST_ROW1);
+          p.add(TEST_FAMILY1, TEST_Q2, 127, ZERO);
+          p.setACL(permsU1andOwner);
+          t.put(p);
+        } finally {
+          t.close();
+        }
+        return null;
+      }
+    }, USER_OWNER);
+
+    // new Put with TEST_Q1 column having TS=125. This covers old cell with TS 123 and user1 is
+    // having RW permission. While TEST_Q2 is with latest TS and so it covers old cell with TS 127.
+    // User1 is having RW permission on that too.
+    user1.runAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        HTable t = new HTable(conf, TEST_TABLE.getTableName());
+        try {
+          Put p = new Put(TEST_ROW1);
+          p.add(TEST_FAMILY1, TEST_Q1, 125, ZERO);
+          p.add(TEST_FAMILY1, TEST_Q2, ZERO);
+          p.setACL(user2.getShortName(), new Permission(Permission.Action.READ,
+              Permission.Action.WRITE));
+          t.put(p);
+        } finally {
+          t.close();
+        }
+        return null;
+      }
+    });
+
+    // Should be denied.
+    user2.runAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        HTable t = new HTable(conf, TEST_TABLE.getTableName());
+        try {
+          Put p = new Put(TEST_ROW1);
+          // column Q1 covers version at 123 fr which user2 do not have permission
+          p.add(TEST_FAMILY1, TEST_Q1, 124, ZERO);
+          p.add(TEST_FAMILY1, TEST_Q2, ZERO);
+          t.put(p);
+          fail();
+        } catch (Exception e) {
+
         } finally {
           t.close();
         }