You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2014/04/05 10:20:09 UTC

svn commit: r1585020 - in /hbase/branches/0.98/hbase-server/src: main/java/org/apache/hadoop/hbase/security/access/ test/java/org/apache/hadoop/hbase/security/access/

Author: ramkrishna
Date: Sat Apr  5 08:20:08 2014
New Revision: 1585020

URL: http://svn.apache.org/r1585020
Log:
HBASE-10899-[AccessController] Apply MAX_VERSIONS from schema or request when scanning (Ram)

Added:
    hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestCellACLWithMultipleVersions.java
Modified:
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java

Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java?rev=1585020&r1=1585019&r2=1585020&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java Sat Apr  5 08:20:08 2014
@@ -18,11 +18,18 @@
 
 package org.apache.hadoop.hbase.security.access;
 
+import java.io.IOException;
+import java.util.Map;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.ByteRange;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.SimpleByteRange;
 
 /**
  * <strong>NOTE: for internal use only by AccessController implementation</strong>
@@ -46,6 +53,11 @@ class AccessControlFilter extends Filter
   private User user;
   private boolean isSystemTable;
   private boolean cellFirstStrategy;
+  private Map<ByteRange, Integer> cfVsMaxVersions;
+  private int familyMaxVersions;
+  private int currentVersions;
+  private ByteRange prevFam;
+  private ByteRange prevQual;
 
   /**
    * For Writable
@@ -54,12 +66,15 @@ class AccessControlFilter extends Filter
   }
 
   AccessControlFilter(TableAuthManager mgr, User ugi, TableName tableName,
-      boolean cellFirstStrategy) {
+      boolean cellFirstStrategy, Map<ByteRange, Integer> cfVsMaxVersions) {
     authManager = mgr;
     table = tableName;
     user = ugi;
     isSystemTable = tableName.isSystemTable();
     this.cellFirstStrategy = cellFirstStrategy;
+    this.cfVsMaxVersions = cfVsMaxVersions;
+    this.prevFam = new SimpleByteRange();
+    this.prevQual = new SimpleByteRange();
   }
 
   @Override
@@ -67,6 +82,27 @@ class AccessControlFilter extends Filter
     if (isSystemTable) {
       return ReturnCode.INCLUDE;
     }
+    if (prevFam.getBytes() == null
+        || (Bytes.compareTo(prevFam.getBytes(), prevFam.getOffset(), prevFam.getLength(),
+            cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) != 0)) {
+      prevFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
+      // Similar to VisibilityLabelFilter
+      familyMaxVersions = cfVsMaxVersions.get(prevFam);
+      // Family is changed. Just unset curQualifier.
+      prevQual.unset();
+    }
+    if (prevQual.getBytes() == null
+        || (Bytes.compareTo(prevQual.getBytes(), prevQual.getOffset(),
+            prevQual.getLength(), cell.getQualifierArray(), cell.getQualifierOffset(),
+            cell.getQualifierLength()) != 0)) {
+      prevQual.set(cell.getQualifierArray(), cell.getQualifierOffset(),
+          cell.getQualifierLength());
+      currentVersions = 0;
+    }
+    currentVersions++;
+    if (currentVersions > familyMaxVersions) {
+      return ReturnCode.SKIP;
+    }
     if (authManager.authorize(user, table, cell, cellFirstStrategy, Permission.Action.READ)) {
       return ReturnCode.INCLUDE;
     }
@@ -76,6 +112,14 @@ class AccessControlFilter extends Filter
     return ReturnCode.SKIP;
   }
 
+  @Override
+  public void reset() throws IOException {
+    this.prevFam.unset();
+    this.prevQual.unset();
+    this.familyMaxVersions = 0;
+    this.currentVersions = 0;
+  }
+
   /**
    * @return The filter serialized using pb
    */

Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java?rev=1585020&r1=1585019&r2=1585020&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java Sat Apr  5 08:20:08 2014
@@ -18,6 +18,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -25,11 +26,6 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -37,21 +33,22 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotDisabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -59,12 +56,20 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.client.Query;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.coprocessor.*;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.MasterObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.ipc.RequestContext;
 import org.apache.hadoop.hbase.master.MasterServices;
@@ -72,21 +77,24 @@ import org.apache.hadoop.hbase.master.Re
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
+import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.access.Permission.Action;
+import org.apache.hadoop.hbase.util.ByteRange;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.SimpleByteRange;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableSet;
@@ -94,8 +102,10 @@ import com.google.common.collect.ListMul
 import com.google.common.collect.Lists;
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.Sets;
-
-import static org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
 
 /**
  * Provides basic authorization checks for data access and administrative
@@ -642,12 +652,19 @@ public class AccessController extends Ba
     if (filter != null && filter instanceof AccessControlFilter) {
       return;
     }
+    Map<ByteRange, Integer> cfVsMaxVersions = new HashMap<ByteRange, Integer>();
+    HRegion region = c.getEnvironment().getRegion();
+    for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
+      cfVsMaxVersions.put(new SimpleByteRange(hcd.getName()), hcd.getMaxVersions());
+    }
     Filter newFilter = (filter != null)
       ? new FilterList(FilterList.Operator.MUST_PASS_ALL,
           Lists.newArrayList(
-            new AccessControlFilter(authManager, activeUser, tableName, cellFirstStrategy),
+              new AccessControlFilter(authManager, activeUser, tableName,
+            cellFirstStrategy, cfVsMaxVersions),
             filter))
-      : new AccessControlFilter(authManager, activeUser, tableName, cellFirstStrategy);
+      : new AccessControlFilter(authManager, activeUser, tableName,
+          cellFirstStrategy, cfVsMaxVersions);
     query.setFilter(newFilter);
   }
 

Added: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestCellACLWithMultipleVersions.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestCellACLWithMultipleVersions.java?rev=1585020&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestCellACLWithMultipleVersions.java (added)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestCellACLWithMultipleVersions.java Sat Apr  5 08:20:08 2014
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.access;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.TestTableName;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+@Category(MediumTests.class)
+public class TestCellACLWithMultipleVersions extends SecureTestUtil {
+  private static final Log LOG = LogFactory.getLog(TestCellACLWithMultipleVersions.class);
+
+  static {
+    Logger.getLogger(AccessController.class).setLevel(Level.TRACE);
+    Logger.getLogger(AccessControlFilter.class).setLevel(Level.TRACE);
+    Logger.getLogger(TableAuthManager.class).setLevel(Level.TRACE);
+  }
+
+  @Rule
+  public TestTableName TEST_TABLE = new TestTableName();
+  private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static Configuration conf;
+
+  // user is table owner. will have all permissions on table
+  private static User USER_OWNER;
+  private static byte[] TEST_FAMILY = Bytes.toBytes("f1");
+
+  private static AccessController ACCESS_CONTROLLER;
+
+  static void verifyConfiguration(Configuration conf) {
+    if (!(conf.get(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY).contains(
+        AccessController.class.getName())
+        && conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY).contains(
+            AccessController.class.getName()) && conf.get(
+        CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY).contains(
+        AccessController.class.getName()))) {
+      throw new RuntimeException("AccessController is missing from a system coprocessor list");
+    }
+  }
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    // setup configuration
+    conf = TEST_UTIL.getConfiguration();
+    conf.set("hbase.master.hfilecleaner.plugins",
+        "org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner,"
+            + "org.apache.hadoop.hbase.master.snapshot.SnapshotHFileCleaner");
+    conf.set("hbase.master.logcleaner.plugins",
+        "org.apache.hadoop.hbase.master.snapshot.SnapshotLogCleaner");
+    // Enable security
+    SecureTestUtil.enableSecurity(conf);
+    // Verify enableSecurity sets up what we require
+    verifyConfiguration(conf);
+
+    // Enable EXEC permission checking
+    conf.setBoolean(AccessController.EXEC_PERMISSION_CHECKS_KEY, true);
+
+    TEST_UTIL.startMiniCluster();
+    MasterCoprocessorHost cpHost = TEST_UTIL.getMiniHBaseCluster()
+        .getMaster().getCoprocessorHost();
+    cpHost.load(AccessController.class, Coprocessor.PRIORITY_HIGHEST, conf);
+    ACCESS_CONTROLLER = (AccessController) cpHost.findCoprocessor(AccessController.class.getName());
+    cpHost.createEnvironment(AccessController.class, ACCESS_CONTROLLER,
+        Coprocessor.PRIORITY_HIGHEST, 1, conf);
+    RegionServerCoprocessorHost rsHost = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0)
+        .getCoprocessorHost();
+    rsHost.createEnvironment(AccessController.class, ACCESS_CONTROLLER,
+        Coprocessor.PRIORITY_HIGHEST, 1, conf);
+
+    // Wait for the ACL table to become available
+    TEST_UTIL.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME.getName());
+
+    // create a set of test users
+    USER_OWNER = User.createUserForTesting(conf, "owner", new String[0]);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    // Create the test table (owner added to the _acl_ table)
+    HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+    HTableDescriptor htd = new HTableDescriptor(TEST_TABLE.getTableName());
+    HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAMILY);
+    hcd.setMaxVersions(4);
+    htd.setOwner(USER_OWNER);
+    htd.addFamily(hcd);
+    admin.createTable(htd, new byte[][] { Bytes.toBytes("s") });
+    TEST_UTIL.waitTableEnabled(TEST_TABLE.getTableName().getName());
+  }
+
+  @Test
+  public void testCellPermissionwithVersions() throws Exception {
+    // table/column/qualifier level permissions
+    final byte[] TEST_ROW = Bytes.toBytes("cellpermtest");
+    final byte[] TEST_ROW1 = Bytes.toBytes("cellpermtest1");
+    final byte[] TEST_Q1 = Bytes.toBytes("q1");
+    // test value
+    final byte[] ZERO = Bytes.toBytes(0L);
+
+    /* ---- Setup ---- */
+
+    // additional test user
+    final User userOther = User.createUserForTesting(conf, "user_check_cell_perms_other",
+        new String[0]);
+
+    // store two sets of values, one store with a cell level ACL, and one
+    // without
+    verifyAllowed(new AccessTestAction() {
+      @Override
+      public Object run() throws Exception {
+        HTable t = new HTable(conf, TEST_TABLE.getTableName());
+        try {
+          Put p;
+          // with ro ACL
+          p = new Put(TEST_ROW).add(TEST_FAMILY, TEST_Q1, ZERO);
+          p.setACL(userOther.getShortName(), new Permission(Permission.Action.WRITE));
+          t.put(p);
+          // with ro ACL
+          p = new Put(TEST_ROW).add(TEST_FAMILY, TEST_Q1, ZERO);
+          p.setACL(userOther.getShortName(), new Permission(Permission.Action.READ));
+          t.put(p);
+          p = new Put(TEST_ROW).add(TEST_FAMILY, TEST_Q1, ZERO);
+          p.setACL(userOther.getShortName(), new Permission(Permission.Action.WRITE));
+          t.put(p);
+          p = new Put(TEST_ROW).add(TEST_FAMILY, TEST_Q1, ZERO);
+          p.setACL(userOther.getShortName(), new Permission(Permission.Action.READ));
+          t.put(p);
+          p = new Put(TEST_ROW).add(TEST_FAMILY, TEST_Q1, ZERO);
+          p.setACL(userOther.getShortName(), new Permission(Permission.Action.WRITE));
+          t.put(p);
+        } finally {
+          t.close();
+        }
+        return null;
+      }
+    }, USER_OWNER);
+
+    /* ---- Gets ---- */
+
+    AccessTestAction getQ1 = new AccessTestAction() {
+      @Override
+      public Object run() throws Exception {
+        Get get = new Get(TEST_ROW);
+        get.setMaxVersions(10);
+        HTable t = new HTable(conf, TEST_TABLE.getTableName());
+        try {
+          return t.get(get).listCells();
+        } finally {
+          t.close();
+        }
+      }
+    };
+
+    AccessTestAction get2 = new AccessTestAction() {
+      @Override
+      public Object run() throws Exception {
+        Get get = new Get(TEST_ROW1);
+        get.setMaxVersions(10);
+        HTable t = new HTable(conf, TEST_TABLE.getTableName());
+        try {
+          return t.get(get).listCells();
+        } finally {
+          t.close();
+        }
+      }
+    };
+    // Confirm special read access set at cell level
+
+    verifyAllowed(userOther, getQ1, 2);
+
+    // store two sets of values, one store with a cell level ACL, and one
+    // without
+    verifyAllowed(new AccessTestAction() {
+      @Override
+      public Object run() throws Exception {
+        HTable t = new HTable(conf, TEST_TABLE.getTableName());
+        try {
+          Put p;
+          p = new Put(TEST_ROW1).add(TEST_FAMILY, TEST_Q1, ZERO);
+          p.setACL(userOther.getShortName(), new Permission(Permission.Action.WRITE));
+          t.put(p);
+          p = new Put(TEST_ROW1).add(TEST_FAMILY, TEST_Q1, ZERO);
+          p.setACL(userOther.getShortName(), new Permission(Permission.Action.READ));
+          t.put(p);
+          p = new Put(TEST_ROW1).add(TEST_FAMILY, TEST_Q1, ZERO);
+          p.setACL(userOther.getShortName(), new Permission(Permission.Action.WRITE));
+          t.put(p);
+        } finally {
+          t.close();
+        }
+        return null;
+      }
+    }, USER_OWNER);
+    // Confirm special read access set at cell level
+
+    verifyAllowed(userOther, get2, 1);
+  }
+
+  public void verifyAllowed(User user, AccessTestAction action, int count) throws Exception {
+    try {
+      Object obj = user.runAs(action);
+      if (obj != null && obj instanceof List<?>) {
+        List<?> results = (List<?>) obj;
+        if (results != null && results.isEmpty()) {
+          fail("Empty non null results from action for user '" + user.getShortName() + "'");
+        }
+        assertEquals(results.size(), count);
+      }
+    } catch (AccessDeniedException ade) {
+      fail("Expected action to pass for user '" + user.getShortName() + "' but was denied");
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    // Clean the _acl_ table
+    try {
+      TEST_UTIL.deleteTable(TEST_TABLE.getTableName());
+    } catch (TableNotFoundException ex) {
+      // Test deleted the table, no problem
+      LOG.info("Test deleted table " + TEST_TABLE.getTableName());
+    }
+    assertEquals(0, AccessControlLists.getTablePermissions(conf, TEST_TABLE.getTableName()).size());
+  }
+}