You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2014/08/20 11:17:58 UTC

[2/3] HBASE-11553 Abstract visibility label related services into an interface.

http://git-wip-us.apache.org/repos/asf/hbase/blob/062f6a6a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
index fb62c2b..e52da1f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
@@ -22,20 +22,13 @@ import static org.apache.hadoop.hbase.HConstants.OperationStatusCode.SANITY_CHEC
 import static org.apache.hadoop.hbase.HConstants.OperationStatusCode.SUCCESS;
 import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_FAMILY;
 import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME;
-import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABEL_QUALIFIER;
-import static org.apache.hadoop.hbase.security.visibility.VisibilityUtils.SYSTEM_LABEL;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -55,6 +48,7 @@ import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
@@ -69,10 +63,8 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
 import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.MasterObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
@@ -80,7 +72,6 @@ import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.util.StreamUtils;
 import org.apache.hadoop.hbase.ipc.RequestContext;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
@@ -105,16 +96,8 @@ import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.access.AccessControlLists;
 import org.apache.hadoop.hbase.security.access.AccessController;
-import org.apache.hadoop.hbase.security.visibility.expression.ExpressionNode;
-import org.apache.hadoop.hbase.security.visibility.expression.LeafExpressionNode;
-import org.apache.hadoop.hbase.security.visibility.expression.NonLeafExpressionNode;
-import org.apache.hadoop.hbase.security.visibility.expression.Operator;
-import org.apache.hadoop.hbase.util.ByteRange;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.MapMaker;
@@ -132,19 +115,6 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
     VisibilityLabelsService.Interface, CoprocessorService {
 
   private static final Log LOG = LogFactory.getLog(VisibilityController.class);
-  private static final byte[] DUMMY_VALUE = new byte[0];
-  // "system" label is having an ordinal value 1.
-  private static final int SYSTEM_LABEL_ORDINAL = 1;
-  private static final Tag[] LABELS_TABLE_TAGS = new Tag[1];
-
-  private final ExpressionParser expressionParser = new ExpressionParser();
-  private final ExpressionExpander expressionExpander = new ExpressionExpander();
-  private VisibilityLabelsManager visibilityManager;
-  // defined only for Endpoint implementation, so it can have way to access region services.
-  private RegionCoprocessorEnvironment regionEnv;
-  private List<ScanLabelGenerator> scanLabelGenerators;
-
-  private volatile int ordinalCounter = -1;
   // flags if we are running on a region of the 'labels' table
   private boolean labelsRegion = false;
   // Flag denoting whether AcessController is available or not.
@@ -157,23 +127,13 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
       new MapMaker().weakKeys().makeMap();
 
   List<String> superUsers;
-
-  static {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    DataOutputStream dos = new DataOutputStream(baos);
-    try {
-      StreamUtils.writeRawVInt32(dos, SYSTEM_LABEL_ORDINAL);
-    } catch (IOException e) {
-      // We write to a byte array. No Exception can happen.
-    }
-    LABELS_TABLE_TAGS[0] = new Tag(VisibilityUtils.VISIBILITY_TAG_TYPE, baos.toByteArray());
-  }
+  private VisibilityLabelService visibilityLabelService;
 
   // Add to this list if there are any reserved tag types
-  private static ArrayList<Byte> reservedVisTagTypes = new ArrayList<Byte>();
+  private static ArrayList<Byte> RESERVED_VIS_TAG_TYPES = new ArrayList<Byte>();
   static {
-    reservedVisTagTypes.add(VisibilityUtils.VISIBILITY_TAG_TYPE);
-    reservedVisTagTypes.add(VisibilityUtils.VISIBILITY_EXP_SERIALIZATION_TAG_TYPE);
+    RESERVED_VIS_TAG_TYPES.add(TagType.VISIBILITY_TAG_TYPE);
+    RESERVED_VIS_TAG_TYPES.add(TagType.VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE);
   }
 
   @Override
@@ -184,34 +144,16 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
         + " is required to persist visibility labels. Consider setting " + HFile.FORMAT_VERSION_KEY
         + " accordingly.");
     }
-    ZooKeeperWatcher zk = null;
-    if (env instanceof MasterCoprocessorEnvironment) {
-      // if running on HMaster
-      MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
-      zk = mEnv.getMasterServices().getZooKeeper();
-    } else if (env instanceof RegionCoprocessorEnvironment) {
-      // if running at region
-      regionEnv = (RegionCoprocessorEnvironment) env;
-      zk = regionEnv.getRegionServerServices().getZooKeeper();
-    } else if (env instanceof RegionServerCoprocessorEnvironment) {
+    if (env instanceof RegionServerCoprocessorEnvironment) {
       throw new RuntimeException(
           "Visibility controller should not be configured as " +
           "'hbase.coprocessor.regionserver.classes'.");
     }
 
-    // If zk is null or IOException while obtaining auth manager,
-    // throw RuntimeException so that the coprocessor is unloaded.
-    if (zk == null) {
-      throw new RuntimeException("Error obtaining VisibilityLabelsManager, zk found null.");
-    }
-    try {
-      this.visibilityManager = VisibilityLabelsManager.get(zk, this.conf);
-    } catch (IOException ioe) {
-      throw new RuntimeException("Error obtaining VisibilityLabelsManager", ioe);
-    }
     if (env instanceof RegionCoprocessorEnvironment) {
-      // ScanLabelGenerator to be instantiated only with Region Observer.
-      scanLabelGenerators = VisibilityUtils.getScanLabelGenerators(this.conf);
+      // VisibilityLabelService to be instantiated only with Region Observer.
+      visibilityLabelService = VisibilityLabelServiceManager.getInstance()
+          .getVisibilityLabelService(this.conf);
     }
     this.superUsers = getSystemAndSuperUsers();
   }
@@ -292,73 +234,31 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
     if (e.getEnvironment().getRegion().getRegionInfo().getTable().equals(LABELS_TABLE_NAME)) {
       this.labelsRegion = true;
       this.acOn = CoprocessorHost.getLoadedCoprocessors().contains(AccessController.class.getName());
+      // Defer the init of VisibilityLabelService on labels region until it is in recovering state.
       if (!e.getEnvironment().getRegion().isRecovering()) {
-        initialize(e);
+        initVisibilityLabelService(e.getEnvironment());
       }
     } else {
       checkAuths = e.getEnvironment().getConfiguration()
           .getBoolean(VisibilityConstants.CHECK_AUTHS_FOR_MUTATION, false);
-      this.initialized = true;
+      initVisibilityLabelService(e.getEnvironment());
     }
   }
 
   @Override
   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> e) {
     if (this.labelsRegion) {
-      initialize(e);
+      initVisibilityLabelService(e.getEnvironment());
     }
   }
 
-  private void initialize(ObserverContext<RegionCoprocessorEnvironment> e) {
+  private void initVisibilityLabelService(RegionCoprocessorEnvironment env) {
     try {
-      Pair<Map<String, Integer>, Map<String, List<Integer>>> labelsAndUserAuths =
-          extractLabelsAndAuths(getExistingLabelsWithAuths());
-      Map<String, Integer> labels = labelsAndUserAuths.getFirst();
-      Map<String, List<Integer>> userAuths = labelsAndUserAuths.getSecond();
-      // Add the "system" label if it is not added into the system yet
-      addSystemLabel(e.getEnvironment().getRegion(), labels, userAuths);
-      int ordinal = 1; // Ordinal 1 is reserved for "system" label.
-      for (Integer i : labels.values()) {
-        if (i > ordinal) {
-          ordinal = i;
-        }
-      }
-      this.ordinalCounter = ordinal + 1;
-      if (labels.size() > 0) {
-        // If there is no data need not write to zk
-        byte[] serialized = VisibilityUtils.getDataToWriteToZooKeeper(labels);
-        this.visibilityManager.writeToZookeeper(serialized, true);
-      }
-      if (userAuths.size() > 0) {
-        byte[] serialized = VisibilityUtils.getUserAuthsDataToWriteToZooKeeper(userAuths);
-        this.visibilityManager.writeToZookeeper(serialized, false);
-      }
-      initialized = true;
+      this.visibilityLabelService.init(env);
+      this.initialized = true;
     } catch (IOException ioe) {
-      LOG.error("Error while updating the zk with the exisiting labels data", ioe);
-    }
-  }
-
-  private void addSystemLabel(HRegion region, Map<String, Integer> labels,
-      Map<String, List<Integer>> userAuths) throws IOException {
-    if (!labels.containsKey(SYSTEM_LABEL)) {
-      Put p = new Put(Bytes.toBytes(SYSTEM_LABEL_ORDINAL));
-      p.addImmutable(LABELS_TABLE_FAMILY, LABEL_QUALIFIER, Bytes.toBytes(SYSTEM_LABEL));
-      // Set auth for "system" label for all super users.
-      for (String superUser : this.superUsers) {
-        p.addImmutable(
-            LABELS_TABLE_FAMILY, Bytes.toBytes(superUser), DUMMY_VALUE, LABELS_TABLE_TAGS);
-      }
-      region.put(p);
-      labels.put(SYSTEM_LABEL, SYSTEM_LABEL_ORDINAL);
-      for (String superUser : superUsers) {
-        List<Integer> auths = userAuths.get(superUser);
-        if (auths == null) {
-          auths = new ArrayList<Integer>(1);
-          userAuths.put(superUser, auths);
-        }
-        auths.add(SYSTEM_LABEL_ORDINAL);
-      }
+      LOG.error("Error while initializing VisibilityLabelService..", ioe);
+      throw new RuntimeException(ioe);
     }
   }
 
@@ -370,11 +270,6 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
     }
     // TODO this can be made as a global LRU cache at HRS level?
     Map<String, List<Tag>> labelCache = new HashMap<String, List<Tag>>();
-    Set<Integer> auths = null;
-    User user = getActiveUser();
-    if (checkAuths && user != null && user.getShortName() != null) {
-      auths = this.visibilityManager.getAuthsAsOrdinals(user.getShortName());
-    }
     for (int i = 0; i < miniBatchOp.size(); i++) {
       Mutation m = miniBatchOp.getOperation(i);
       CellVisibility cellVisibility = null;
@@ -385,46 +280,48 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
             new OperationStatus(SANITY_CHECK_FAILURE, de.getMessage()));
         continue;
       }
-        boolean sanityFailure = false;
-        for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
-          if (!checkForReservedVisibilityTagPresence(cellScanner.current())) {
-            miniBatchOp.setOperationStatus(i, new OperationStatus(SANITY_CHECK_FAILURE,
-                "Mutation contains cell with reserved type tag"));
-            sanityFailure = true;
-            break;
-          }
+      boolean sanityFailure = false;
+      for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
+        if (!checkForReservedVisibilityTagPresence(cellScanner.current())) {
+          miniBatchOp.setOperationStatus(i, new OperationStatus(SANITY_CHECK_FAILURE,
+              "Mutation contains cell with reserved type tag"));
+          sanityFailure = true;
+          break;
         }
-        if (!sanityFailure) {
-          if (cellVisibility != null) {
-            String labelsExp = cellVisibility.getExpression();
-            List<Tag> visibilityTags = labelCache.get(labelsExp);
-            if (visibilityTags == null) {
-              try {
-                visibilityTags = createVisibilityTags(labelsExp, true, auths, user.getShortName());
-              } catch (ParseException e) {
-                miniBatchOp.setOperationStatus(i,
-                    new OperationStatus(SANITY_CHECK_FAILURE, e.getMessage()));
-              } catch (InvalidLabelException e) {
-                miniBatchOp.setOperationStatus(i,
-                    new OperationStatus(SANITY_CHECK_FAILURE, e.getMessage()));
-              }
+      }
+      if (!sanityFailure) {
+        if (cellVisibility != null) {
+          String labelsExp = cellVisibility.getExpression();
+          List<Tag> visibilityTags = labelCache.get(labelsExp);
+          if (visibilityTags == null) {
+            // Don't check user auths for labels with Mutations when the user is super user
+            boolean authCheck = this.checkAuths && !(isSystemOrSuperUser());
+            try {
+              visibilityTags = this.visibilityLabelService.createVisibilityExpTags(labelsExp, true,
+                  authCheck);
+            } catch (InvalidLabelException e) {
+              miniBatchOp.setOperationStatus(i,
+                  new OperationStatus(SANITY_CHECK_FAILURE, e.getMessage()));
             }
             if (visibilityTags != null) {
               labelCache.put(labelsExp, visibilityTags);
-              List<Cell> updatedCells = new ArrayList<Cell>();
-              for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
-                Cell cell = cellScanner.current();
-                List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
-                    cell.getTagsLength());
-                tags.addAll(visibilityTags);
-                Cell updatedCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(),
-                    cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(),
-                    cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(),
-                    cell.getQualifierLength(), cell.getTimestamp(), Type.codeToType(cell
-                        .getTypeByte()), cell.getValueArray(), cell.getValueOffset(),
-                    cell.getValueLength(), tags);
-                updatedCells.add(updatedCell);
-              }
+            }
+          }
+          if (visibilityTags != null) {
+            List<Cell> updatedCells = new ArrayList<Cell>();
+            for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
+              Cell cell = cellScanner.current();
+              List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+                  cell.getTagsLength());
+              tags.addAll(visibilityTags);
+              Cell updatedCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(),
+                  cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(),
+                  cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(),
+                  cell.getQualifierLength(), cell.getTimestamp(), Type.codeToType(cell
+                      .getTypeByte()), cell.getValueArray(), cell.getValueOffset(),
+                  cell.getValueLength(), tags);
+              updatedCells.add(updatedCell);
+            }
             m.getFamilyCellMap().clear();
             // Clear and add new Cells to the Mutation.
             for (Cell cell : updatedCells) {
@@ -460,14 +357,14 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
     if (cellVisibility != null) {
       String labelsExp = cellVisibility.getExpression();
       try {
-        visibilityTags = createVisibilityTags(labelsExp, false, null, null);
-      } catch (ParseException e) {
-        throw new IOException("Invalid cell visibility expression " + labelsExp, e);
+        visibilityTags = this.visibilityLabelService.createVisibilityExpTags(labelsExp, false,
+            false);
       } catch (InvalidLabelException e) {
         throw new IOException("Invalid cell visibility specified " + labelsExp, e);
       }
     }
-    get.setFilter(new DeleteVersionVisibilityExpressionFilter(visibilityTags));
+    get.setFilter(new DeleteVersionVisibilityExpressionFilter(visibilityTags,
+        VisibilityConstants.SORTED_ORDINAL_SERIALIZATION_FORMAT));
     List<Cell> result = ctx.getEnvironment().getRegion().get(get, false);
 
     if (result.size() < get.getMaxVersions()) {
@@ -490,89 +387,6 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
     ctx.bypass();
   }
 
-  @Override
-  public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
-      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
-    if (this.labelsRegion) {
-      // We will add to zookeeper here.
-      Pair<Map<String, Integer>, Map<String, List<Integer>>> labelsAndUserAuths =
-          extractLabelsAndAuths(getExistingLabelsWithAuths());
-      Map<String, Integer> existingLabels = labelsAndUserAuths.getFirst();
-      Map<String, List<Integer>> userAuths = labelsAndUserAuths.getSecond();
-      boolean isNewLabels = false;
-      boolean isUserAuthsChange = false;
-      for (int i = 0; i < miniBatchOp.size(); i++) {
-        Mutation m = miniBatchOp.getOperation(i);
-        if (miniBatchOp.getOperationStatus(i).getOperationStatusCode() == SUCCESS) {
-          for (List<Cell> cells : m.getFamilyCellMap().values()) {
-            for (Cell cell : cells) {
-              int labelOrdinal = Bytes.toInt(cell.getRowArray(), cell.getRowOffset());
-              if (Bytes.equals(cell.getQualifierArray(), cell.getQualifierOffset(),
-                      cell.getQualifierLength(), LABEL_QUALIFIER, 0,
-                      LABEL_QUALIFIER.length)) {
-                if (m instanceof Put) {
-                  existingLabels.put(
-                      Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
-                          cell.getValueLength()), labelOrdinal);
-                  isNewLabels = true;
-                }
-              } else {
-                String user = Bytes.toString(cell.getQualifierArray(),
-                    cell.getQualifierOffset(), cell.getQualifierLength());
-                List<Integer> auths = userAuths.get(user);
-                if (auths == null) {
-                  auths = new ArrayList<Integer>();
-                  userAuths.put(user, auths);
-                }
-                if (m instanceof Delete) {
-                  auths.remove(Integer.valueOf(labelOrdinal));
-                } else {
-                  auths.add(labelOrdinal);
-                }
-                isUserAuthsChange = true;
-              }
-            }
-          }
-        }
-      }
-      if (isNewLabels) {
-        byte[] serialized = VisibilityUtils.getDataToWriteToZooKeeper(existingLabels);
-        this.visibilityManager.writeToZookeeper(serialized, true);
-      }
-      if (isUserAuthsChange) {
-        byte[] serialized = VisibilityUtils.getUserAuthsDataToWriteToZooKeeper(userAuths);
-        this.visibilityManager.writeToZookeeper(serialized, false);
-      }
-    }
-  }
-
-  private Pair<Map<String, Integer>, Map<String, List<Integer>>> extractLabelsAndAuths(
-      List<List<Cell>> labelDetails) {
-    Map<String, Integer> labels = new HashMap<String, Integer>();
-    Map<String, List<Integer>> userAuths = new HashMap<String, List<Integer>>();
-    for (List<Cell> cells : labelDetails) {
-      for (Cell cell : cells) {
-        if (Bytes.equals(cell.getQualifierArray(), cell.getQualifierOffset(),
-            cell.getQualifierLength(), LABEL_QUALIFIER, 0, LABEL_QUALIFIER.length)) {
-          labels.put(
-              Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()),
-              Bytes.toInt(cell.getRowArray(), cell.getRowOffset()));
-        } else {
-          // These are user cells who has authorization for this label
-          String user = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
-              cell.getQualifierLength());
-          List<Integer> auths = userAuths.get(user);
-          if (auths == null) {
-            auths = new ArrayList<Integer>();
-            userAuths.put(user, auths);
-          }
-          auths.add(Bytes.toInt(cell.getRowArray(), cell.getRowOffset()));
-        }
-      }
-    }
-    return new Pair<Map<String, Integer>, Map<String, List<Integer>>>(labels, userAuths);
-  }
-
   // Checks whether cell contains any tag with type as VISIBILITY_TAG_TYPE.
   // This tag type is reserved and should not be explicitly set by user.
   private boolean checkForReservedVisibilityTagPresence(Cell cell) throws IOException {
@@ -586,7 +400,7 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
       Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
           cell.getTagsLength());
       while (tagsItr.hasNext()) {
-        if (reservedVisTagTypes.contains(tagsItr.next().getType())) {
+        if (RESERVED_VIS_TAG_TYPES.contains(tagsItr.next().getType())) {
           return false;
         }
       }
@@ -594,114 +408,29 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
     return true;
   }
 
-  private List<Tag> createVisibilityTags(String visibilityLabelsExp, boolean addSerializationTag,
-      Set<Integer> auths, String userName) throws IOException, ParseException,
-      InvalidLabelException {
-    ExpressionNode node = null;
-    node = this.expressionParser.parse(visibilityLabelsExp);
-    node = this.expressionExpander.expand(node);
-    List<Tag> tags = new ArrayList<Tag>();
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    DataOutputStream dos = new DataOutputStream(baos);
-    List<Integer> labelOrdinals = new ArrayList<Integer>();
-    // We will be adding this tag before the visibility tags and the presence of this
-    // tag indicates we are supporting deletes with cell visibility
-    if (addSerializationTag) {
-      tags.add(VisibilityUtils.VIS_SERIALIZATION_TAG);
-    }
-    if (node.isSingleNode()) {
-      getLabelOrdinals(node, labelOrdinals, auths, userName);
-      writeLabelOrdinalsToStream(labelOrdinals, dos);
-      tags.add(new Tag(VisibilityUtils.VISIBILITY_TAG_TYPE, baos.toByteArray()));
-      baos.reset();
-    } else {
-      NonLeafExpressionNode nlNode = (NonLeafExpressionNode) node;
-      if (nlNode.getOperator() == Operator.OR) {
-        for (ExpressionNode child : nlNode.getChildExps()) {
-          getLabelOrdinals(child, labelOrdinals, auths, userName);
-          writeLabelOrdinalsToStream(labelOrdinals, dos);
-          tags.add(new Tag(VisibilityUtils.VISIBILITY_TAG_TYPE, baos.toByteArray()));
-          baos.reset();
-          labelOrdinals.clear();
-        }
-      } else {
-        getLabelOrdinals(nlNode, labelOrdinals, auths, userName);
-        writeLabelOrdinalsToStream(labelOrdinals, dos);
-        tags.add(new Tag(VisibilityUtils.VISIBILITY_TAG_TYPE, baos.toByteArray()));
-        baos.reset();
-      }
-    }
-    return tags;
-  }
-
-  private void writeLabelOrdinalsToStream(List<Integer> labelOrdinals, DataOutputStream dos)
-      throws IOException {
-    Collections.sort(labelOrdinals);
-    for (Integer labelOrdinal : labelOrdinals) {
-      StreamUtils.writeRawVInt32(dos, labelOrdinal);
-    }
-  }
-
-  private void getLabelOrdinals(ExpressionNode node, List<Integer> labelOrdinals,
-      Set<Integer> auths, String userName) throws IOException, InvalidLabelException {
-    if (node.isSingleNode()) {
-      String identifier = null;
-      int labelOrdinal = 0;
-      if (node instanceof LeafExpressionNode) {
-        identifier = ((LeafExpressionNode) node)
-            .getIdentifier();
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("The identifier is "+identifier);
-        }
-        labelOrdinal = this.visibilityManager.getLabelOrdinal(identifier);
-        checkAuths(auths, userName, labelOrdinal, identifier);
-      } else {
-        // This is a NOT node.
-        LeafExpressionNode lNode = (LeafExpressionNode) ((NonLeafExpressionNode) node)
-            .getChildExps().get(0);
-        identifier = lNode.getIdentifier();
-        labelOrdinal = this.visibilityManager.getLabelOrdinal(identifier);
-        checkAuths(auths, userName, labelOrdinal, identifier);
-        labelOrdinal = -1 * labelOrdinal; // Store NOT node as -ve ordinal.
-      }
-      if (labelOrdinal == 0) {
-        throw new InvalidLabelException("Invalid visibility label " + identifier);
-      }
-      labelOrdinals.add(labelOrdinal);
-    } else {
-      List<ExpressionNode> childExps = ((NonLeafExpressionNode) node).getChildExps();
-      for (ExpressionNode child : childExps) {
-        getLabelOrdinals(child, labelOrdinals, auths, userName);
-      }
-    }
-  }
-
-  private void checkAuths(Set<Integer> auths, String userName, int labelOrdinal, String identifier)
-      throws IOException {
-    if (checkAuths && !isSystemOrSuperUser()) {
-      if (auths == null || (!auths.contains(labelOrdinal))) {
-        throw new AccessDeniedException("Visibility label " + identifier
-            + " not authorized for the user " + userName);
-      }
-    }
-  }
-
   @Override
   public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
       RegionScanner s) throws IOException {
+    if (!initialized) throw new IOException("VisibilityController not yet initialized!!");
     HRegion region = e.getEnvironment().getRegion();
     Authorizations authorizations = null;
-    // If a super user issues a scan, he should be able to scan the cells
-    // irrespective of the Visibility labels
-    if (checkIfScanOrGetFromSuperUser()) {
-      return s;
-    }
     try {
       authorizations = scan.getAuthorizations();
     } catch (DeserializationException de) {
       throw new IOException(de);
     }
-    Filter visibilityLabelFilter = createVisibilityLabelFilter(region, authorizations);
+    if (authorizations == null) {
+      // No Authorizations present for this scan/Get!
+      // In case of system tables other than "labels" just scan with out visibility check and
+      // filtering. Checking visibility labels for META and NAMESPACE table is not needed.
+      TableName table = region.getRegionInfo().getTable();
+      if (table.isSystemTable() && !table.equals(LABELS_TABLE_NAME)) {
+        return s;
+      }
+    }
+
+    Filter visibilityLabelFilter = VisibilityUtils.createVisibilityLabelFilter(region,
+        authorizations);
     if (visibilityLabelFilter != null) {
       Filter filter = scan.getFilter();
       if (filter != null) {
@@ -713,15 +442,6 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
     return s;
   }
 
-  private boolean checkIfScanOrGetFromSuperUser() throws IOException {
-    User user = getActiveUser();
-    if (user != null && user.getShortName() != null) {
-      List<String> auths = this.visibilityManager.getAuths(user.getShortName());
-      return (auths.contains(SYSTEM_LABEL));
-    }
-    return false;
-  }
-
   @Override
   public DeleteTracker postInstantiateDeleteTracker(
       ObserverContext<RegionCoprocessorEnvironment> ctx, DeleteTracker delTracker)
@@ -738,10 +458,11 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
     // but also on the visibility expression matching.
     return new VisibilityScanDeleteTracker();
   }
+
   @Override
   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
       final Scan scan, final RegionScanner s) throws IOException {
-    User user = getActiveUser();
+    User user = VisibilityUtils.getActiveUser();
     if (user != null && user.getShortName() != null) {
       scannerOwners.put(s, user.getShortName());
     }
@@ -786,83 +507,33 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
   @Override
   public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results)
       throws IOException {
+    if (!initialized) throw new IOException("VisibilityController not yet initialized!!");
+    HRegion region = e.getEnvironment().getRegion();
     Authorizations authorizations = null;
-    // If a super user issues a get, he should be able to scan the cells
-    // irrespective of the Visibility labels
-    if (checkIfScanOrGetFromSuperUser()) {
-      return;
-    }
     try {
       authorizations = get.getAuthorizations();
     } catch (DeserializationException de) {
       throw new IOException(de);
     }
-    Filter visibilityLabelFilter = createVisibilityLabelFilter(e.getEnvironment().getRegion(),
-        authorizations);
-    if (visibilityLabelFilter != null) {
-      Filter filter = get.getFilter();
-      if (filter != null) {
-        get.setFilter(new FilterList(filter, visibilityLabelFilter));
-      } else {
-        get.setFilter(visibilityLabelFilter);
-      }
-    }
-  }
-
-  private Filter createVisibilityLabelFilter(HRegion region, Authorizations authorizations)
-      throws IOException {
-    Map<ByteRange, Integer> cfVsMaxVersions = new HashMap<ByteRange, Integer>();
-    for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
-      cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
-    }
     if (authorizations == null) {
       // No Authorizations present for this scan/Get!
       // In case of system tables other than "labels" just scan with out visibility check and
       // filtering. Checking visibility labels for META and NAMESPACE table is not needed.
       TableName table = region.getRegionInfo().getTable();
       if (table.isSystemTable() && !table.equals(LABELS_TABLE_NAME)) {
-        return null;
+        return;
       }
     }
-    Filter visibilityLabelFilter = null;
-    if (this.scanLabelGenerators != null) {
-      List<String> labels = null;
-      for (ScanLabelGenerator scanLabelGenerator : this.scanLabelGenerators) {
-        try {
-          // null authorizations to be handled inside SLG impl.
-          labels = scanLabelGenerator.getLabels(getActiveUser(), authorizations);
-          labels = (labels == null) ? new ArrayList<String>() : labels;
-          authorizations = new Authorizations(labels);
-        } catch (Throwable t) {
-          LOG.error(t);
-          throw new IOException(t);
-        }
-      }
-      int labelsCount = this.visibilityManager.getLabelsCount();
-      BitSet bs = new BitSet(labelsCount + 1); // ordinal is index 1 based
-      if (labels != null) {
-        for (String label : labels) {
-          int labelOrdinal = this.visibilityManager.getLabelOrdinal(label);
-          if (labelOrdinal != 0) {
-            bs.set(labelOrdinal);
-          }
-        }
+    Filter visibilityLabelFilter = VisibilityUtils.createVisibilityLabelFilter(e.getEnvironment()
+        .getRegion(), authorizations);
+    if (visibilityLabelFilter != null) {
+      Filter filter = get.getFilter();
+      if (filter != null) {
+        get.setFilter(new FilterList(filter, visibilityLabelFilter));
+      } else {
+        get.setFilter(visibilityLabelFilter);
       }
-      visibilityLabelFilter = new VisibilityLabelFilter(bs, cfVsMaxVersions);
-    }
-    return visibilityLabelFilter;
-  }
-
-  private User getActiveUser() throws IOException {
-    User user = RequestContext.getRequestUser();
-    if (!RequestContext.isInRequestContext()) {
-      // for non-rpc handling, fallback to system user
-      user = User.getCurrent();
-    }
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Current active user name is "+user.getShortName());
     }
-    return user;
   }
 
   private List<String> getSystemAndSuperUsers() throws IOException {
@@ -881,7 +552,7 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
   }
 
   private boolean isSystemOrSuperUser() throws IOException {
-    User activeUser = getActiveUser();
+    User activeUser = VisibilityUtils.getActiveUser();
     return this.superUsers.contains(activeUser.getShortName());
   }
 
@@ -920,27 +591,20 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
     if (cellVisibility == null) {
       return newCell;
     }
-    Set<Integer> auths = null;
-    User user = getActiveUser();
-    if (checkAuths && user != null && user.getShortName() != null) {
-      auths = this.visibilityManager.getAuthsAsOrdinals(user.getShortName());
-    }
     // Prepend new visibility tags to a new list of tags for the cell
-    try {
-      tags.addAll(createVisibilityTags(cellVisibility.getExpression(), true, auths,
-        user.getShortName()));
-    } catch (ParseException e) {
-      throw new IOException(e);
-    }
+    // Don't check user auths for labels with Mutations when the user is super user
+    boolean authCheck = this.checkAuths && !(isSystemOrSuperUser());
+    tags.addAll(this.visibilityLabelService.createVisibilityExpTags(cellVisibility.getExpression(),
+        true, authCheck));
     // Save an object allocation where we can
     if (newCell.getTagsLength() > 0) {
       // Carry forward all other tags
-      Iterator<Tag> tagsItr = CellUtil.tagsIterator(newCell.getTagsArray(), newCell.getTagsOffset(),
-        newCell.getTagsLength());
+      Iterator<Tag> tagsItr = CellUtil.tagsIterator(newCell.getTagsArray(),
+          newCell.getTagsOffset(), newCell.getTagsLength());
       while (tagsItr.hasNext()) {
         Tag tag = tagsItr.next();
-        if (tag.getType() != VisibilityUtils.VISIBILITY_TAG_TYPE
-            && tag.getType() != VisibilityUtils.VISIBILITY_EXP_SERIALIZATION_TAG_TYPE) {
+        if (tag.getType() != TagType.VISIBILITY_TAG_TYPE
+            && tag.getType() != TagType.VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE) {
           tags.add(tag);
         }
       }
@@ -955,7 +619,7 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
             .getTypeByte()), newCell.getValueArray(), newCell.getValueOffset(),
         newCell.getValueLength(), tags);
     // Preserve mvcc data
-    rewriteKv.setSequenceId(newCell.getMvccVersion());
+    rewriteKv.setSequenceId(newCell.getSequenceId());
     return rewriteKv;
   }
 
@@ -969,52 +633,41 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
   public synchronized void addLabels(RpcController controller, VisibilityLabelsRequest request,
       RpcCallback<VisibilityLabelsResponse> done) {
     VisibilityLabelsResponse.Builder response = VisibilityLabelsResponse.newBuilder();
-    List<VisibilityLabel> labels = request.getVisLabelList();
+    List<VisibilityLabel> visLabels = request.getVisLabelList();
     if (!initialized) {
-      setExceptionResults(labels.size(), new CoprocessorException(
+      setExceptionResults(visLabels.size(), new CoprocessorException(
           "VisibilityController not yet initialized"), response);
-    }
-    try {
-      checkCallingUserAuth();
-      List<Mutation> puts = new ArrayList<Mutation>(labels.size());
-      RegionActionResult successResult = RegionActionResult.newBuilder().build();
-      for (VisibilityLabel visLabel : labels) {
-        byte[] label = visLabel.getLabel().toByteArray();
-        String labelStr = Bytes.toString(label);
-        if (this.visibilityManager.getLabelOrdinal(labelStr) > 0) {
-          RegionActionResult.Builder failureResultBuilder = RegionActionResult.newBuilder();
-          failureResultBuilder.setException(ResponseConverter
-              .buildException(new LabelAlreadyExistsException("Label '" + labelStr
-                  + "' already exists")));
-          response.addResult(failureResultBuilder.build());
-        } else {
-          Put p = new Put(Bytes.toBytes(ordinalCounter));
-          p.addImmutable(LABELS_TABLE_FAMILY, LABEL_QUALIFIER, label, LABELS_TABLE_TAGS);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Adding the label " + labelStr);
-          }
-          puts.add(p);
-          ordinalCounter++;
-          response.addResult(successResult);
+    } else {
+      try {
+        checkCallingUserAuth();
+        List<byte[]> labels = new ArrayList<byte[]>(visLabels.size());
+        RegionActionResult successResult = RegionActionResult.newBuilder().build();
+        for (VisibilityLabel visLabel : visLabels) {
+          byte[] label = visLabel.getLabel().toByteArray();
+          labels.add(label);
+          response.addResult(successResult); // Just mark as success. Later it will get reset
+                                             // based on the result from
+                                             // visibilityLabelService.addLabels ()
         }
-      }
-      OperationStatus[] opStatus = this.regionEnv.getRegion().batchMutate(
-          puts.toArray(new Mutation[puts.size()]));
-      int i = 0;
-      for (OperationStatus status : opStatus) {
-        if (status.getOperationStatusCode() != SUCCESS) {
-          while (response.getResult(i) != successResult)
+        if (!labels.isEmpty()) {
+          OperationStatus[] opStatus = this.visibilityLabelService.addLabels(labels);
+          int i = 0;
+          for (OperationStatus status : opStatus) {
+            while (response.getResult(i) != successResult)
+              i++;
+            if (status.getOperationStatusCode() != SUCCESS) {
+              RegionActionResult.Builder failureResultBuilder = RegionActionResult.newBuilder();
+              failureResultBuilder.setException(ResponseConverter
+                  .buildException(new DoNotRetryIOException(status.getExceptionMsg())));
+              response.setResult(i, failureResultBuilder.build());
+            }
             i++;
-          RegionActionResult.Builder failureResultBuilder = RegionActionResult.newBuilder();
-          failureResultBuilder.setException(ResponseConverter
-              .buildException(new DoNotRetryIOException(status.getExceptionMsg())));
-          response.setResult(i, failureResultBuilder.build());
+          }
         }
-        i++;
+      } catch (IOException e) {
+        LOG.error(e);
+        setExceptionResults(visLabels.size(), e, response);
       }
-    } catch (IOException e) {
-      LOG.error(e);
-      setExceptionResults(labels.size(), e, response);
     }
     done.run(response.build());
   }
@@ -1029,34 +682,6 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
     }
   }
 
-  private void performACLCheck() throws IOException {
-    // Do ACL check only when the security is enabled.
-    if (this.acOn && !isSystemOrSuperUser()) {
-      User user = getActiveUser();
-      throw new AccessDeniedException("User '" + (user != null ? user.getShortName() : "null")
-          + " is not authorized to perform this action.");
-    }
-  }
-
-  private List<List<Cell>> getExistingLabelsWithAuths() throws IOException {
-    Scan scan = new Scan();
-    RegionScanner scanner = this.regionEnv.getRegion().getScanner(scan);
-    List<List<Cell>> existingLabels = new ArrayList<List<Cell>>();
-    try {
-      while (true) {
-        List<Cell> cells = new ArrayList<Cell>();
-        scanner.next(cells);
-        if (cells.isEmpty()) {
-          break;
-        }
-        existingLabels.add(cells);
-      }
-    } finally {
-      scanner.close();
-    }
-    return existingLabels;
-  }
-
   @Override
   public synchronized void setAuths(RpcController controller, SetAuthsRequest request,
       RpcCallback<VisibilityLabelsResponse> done) {
@@ -1065,46 +690,30 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
     if (!initialized) {
       setExceptionResults(auths.size(), new CoprocessorException(
           "VisibilityController not yet initialized"), response);
-    }
-    byte[] user = request.getUser().toByteArray();
-    try {
-      checkCallingUserAuth();
-      List<Mutation> puts = new ArrayList<Mutation>(auths.size());
-      RegionActionResult successResult = RegionActionResult.newBuilder().build();
-      for (ByteString authBS : auths) {
-        byte[] auth = authBS.toByteArray();
-        String authStr = Bytes.toString(auth);
-        int labelOrdinal = this.visibilityManager.getLabelOrdinal(authStr);
-        if (labelOrdinal == 0) {
-          // This label is not yet added. 1st this should be added to the system
-          RegionActionResult.Builder failureResultBuilder = RegionActionResult.newBuilder();
-          failureResultBuilder.setException(ResponseConverter
-              .buildException(new InvalidLabelException("Label '" + authStr + "' doesn't exist")));
-          response.addResult(failureResultBuilder.build());
-        } else {
-          Put p = new Put(Bytes.toBytes(labelOrdinal));
-          p.addImmutable(
-              LABELS_TABLE_FAMILY, user, DUMMY_VALUE, LABELS_TABLE_TAGS);
-          puts.add(p);
-          response.addResult(successResult);
+    } else {
+      try {
+        checkCallingUserAuth();
+        List<byte[]> labelAuths = new ArrayList<byte[]>(auths.size());
+        for (ByteString authBS : auths) {
+          labelAuths.add(authBS.toByteArray());
         }
-      }
-      OperationStatus[] opStatus = this.regionEnv.getRegion().batchMutate(
-          puts.toArray(new Mutation[puts.size()]));
-      int i = 0;
-      for (OperationStatus status : opStatus) {
-        if (status.getOperationStatusCode() != SUCCESS) {
-          while (response.getResult(i) != successResult) i++;
-          RegionActionResult.Builder failureResultBuilder = RegionActionResult.newBuilder();
-          failureResultBuilder.setException(ResponseConverter
-              .buildException(new DoNotRetryIOException(status.getExceptionMsg())));
-          response.setResult(i, failureResultBuilder.build());
+        OperationStatus[] opStatus = this.visibilityLabelService.setAuths(request.getUser()
+            .toByteArray(), labelAuths);
+        RegionActionResult successResult = RegionActionResult.newBuilder().build();
+        for (OperationStatus status : opStatus) {
+          if (status.getOperationStatusCode() == SUCCESS) {
+            response.addResult(successResult);
+          } else {
+            RegionActionResult.Builder failureResultBuilder = RegionActionResult.newBuilder();
+            failureResultBuilder.setException(ResponseConverter
+                .buildException(new DoNotRetryIOException(status.getExceptionMsg())));
+            response.addResult(failureResultBuilder.build());
+          }
         }
-        i++;
+      } catch (IOException e) {
+        LOG.error(e);
+        setExceptionResults(auths.size(), e, response);
       }
-    } catch (IOException e) {
-      LOG.error(e);
-      setExceptionResults(auths.size(), e, response);
     }
     done.run(response.build());
   }
@@ -1112,44 +721,33 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
   @Override
   public synchronized void getAuths(RpcController controller, GetAuthsRequest request,
       RpcCallback<GetAuthsResponse> done) {
-    byte[] user = request.getUser().toByteArray();
     GetAuthsResponse.Builder response = GetAuthsResponse.newBuilder();
-    response.setUser(request.getUser());
-    try {
-      List<String> labels = getUserAuthsFromLabelsTable(user);
-      for (String label : labels) {
-        response.addAuth(ByteStringer.wrap(Bytes.toBytes(label)));
+    if (!initialized) {
+      controller.setFailed("VisibilityController not yet initialized");
+    } else {
+      byte[] user = request.getUser().toByteArray();
+      List<String> labels = null;
+      try {
+        // We do ACL check here as we create scanner directly on region. It will not make calls to
+        // AccessController CP methods.
+        if (this.acOn && !isSystemOrSuperUser()) {
+          User requestingUser = VisibilityUtils.getActiveUser();
+          throw new AccessDeniedException("User '"
+              + (requestingUser != null ? requestingUser.getShortName() : "null")
+              + " is not authorized to perform this action.");
+        }
+        labels = this.visibilityLabelService.getAuths(user, false);
+      } catch (IOException e) {
+        ResponseConverter.setControllerException(controller, e);
       }
-    } catch (IOException e) {
-      ResponseConverter.setControllerException(controller, e);
-    }
-    done.run(response.build());
-  }
-
-  private List<String> getUserAuthsFromLabelsTable(byte[] user) throws IOException {
-    Scan s = new Scan();
-    s.addColumn(LABELS_TABLE_FAMILY, user);
-    Filter filter = createVisibilityLabelFilter(this.regionEnv.getRegion(), new Authorizations(
-        SYSTEM_LABEL));
-    s.setFilter(filter);
-    List<String> auths = new ArrayList<String>();
-    // We do ACL check here as we create scanner directly on region. It will not make calls to
-    // AccessController CP methods.
-    performACLCheck();
-    RegionScanner scanner = this.regionEnv.getRegion().getScanner(s);
-    List<Cell> results = new ArrayList<Cell>(1);
-    while (true) {
-      scanner.next(results);
-      if (results.isEmpty()) break;
-      Cell cell = results.get(0);
-      int ordinal = Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
-      String label = this.visibilityManager.getLabel(ordinal);
-      if (label != null) {
-        auths.add(label);
+      response.setUser(request.getUser());
+      if (labels != null) {
+        for (String label : labels) {
+          response.addAuth(ByteStringer.wrap(Bytes.toBytes(label)));
+        }
       }
-      results.clear();
     }
-    return auths;
+    done.run(response.build());
   }
 
   @Override
@@ -1160,84 +758,73 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
     if (!initialized) {
       setExceptionResults(auths.size(), new CoprocessorException(
           "VisibilityController not yet initialized"), response);
-    }
-    byte[] user = request.getUser().toByteArray();
-    try {
-      checkCallingUserAuth();
-      List<String> currentAuths = this.getUserAuthsFromLabelsTable(user);
-      List<Mutation> deletes = new ArrayList<Mutation>(auths.size());
-      RegionActionResult successResult = RegionActionResult.newBuilder().build();
-      for (ByteString authBS : auths) {
-        byte[] auth = authBS.toByteArray();
-        String authStr = Bytes.toString(auth);
-        if (currentAuths.contains(authStr)) {
-          int labelOrdinal = this.visibilityManager.getLabelOrdinal(authStr);
-          assert labelOrdinal > 0;
-          Delete d = new Delete(Bytes.toBytes(labelOrdinal));
-          d.deleteColumns(LABELS_TABLE_FAMILY, user);
-          deletes.add(d);
-          response.addResult(successResult);
-        } else {
-          // This label is not set for the user.
-          RegionActionResult.Builder failureResultBuilder = RegionActionResult.newBuilder();
-          failureResultBuilder.setException(ResponseConverter
-              .buildException(new InvalidLabelException("Label '" + authStr
-                  + "' is not set for the user " + Bytes.toString(user))));
-          response.addResult(failureResultBuilder.build());
+    } else {
+      try {
+        // When AC is ON, do AC based user auth check
+        if (this.acOn && !isSystemOrSuperUser()) {
+          User user = VisibilityUtils.getActiveUser();
+          throw new AccessDeniedException("User '" + (user != null ? user.getShortName() : "null")
+              + " is not authorized to perform this action.");
         }
-      }
-      OperationStatus[] opStatus = this.regionEnv.getRegion().batchMutate(
-          deletes.toArray(new Mutation[deletes.size()]));
-      int i = 0;
-      for (OperationStatus status : opStatus) {
-        if (status.getOperationStatusCode() != SUCCESS) {
-          while (response.getResult(i) != successResult) i++;
-          RegionActionResult.Builder failureResultBuilder = RegionActionResult.newBuilder();
-          failureResultBuilder.setException(ResponseConverter
-              .buildException(new DoNotRetryIOException(status.getExceptionMsg())));
-          response.setResult(i, failureResultBuilder.build());
+        checkCallingUserAuth(); // When AC is not in place the calling user should have SYSTEM_LABEL
+                                // auth to do this action.
+        List<byte[]> labelAuths = new ArrayList<byte[]>(auths.size());
+        for (ByteString authBS : auths) {
+          labelAuths.add(authBS.toByteArray());
         }
-        i++;
+        OperationStatus[] opStatus = this.visibilityLabelService.clearAuths(request.getUser()
+            .toByteArray(), labelAuths);
+        RegionActionResult successResult = RegionActionResult.newBuilder().build();
+        for (OperationStatus status : opStatus) {
+          if (status.getOperationStatusCode() == SUCCESS) {
+            response.addResult(successResult);
+          } else {
+            RegionActionResult.Builder failureResultBuilder = RegionActionResult.newBuilder();
+            failureResultBuilder.setException(ResponseConverter
+                .buildException(new DoNotRetryIOException(status.getExceptionMsg())));
+            response.addResult(failureResultBuilder.build());
+          }
+        }
+      } catch (IOException e) {
+        LOG.error(e);
+        setExceptionResults(auths.size(), e, response);
       }
-    } catch (IOException e) {
-      LOG.error(e);
-      setExceptionResults(auths.size(), e, response);
     }
     done.run(response.build());
   }
 
   private void checkCallingUserAuth() throws IOException {
     if (!this.acOn) {
-      User user = getActiveUser();
+      User user = VisibilityUtils.getActiveUser();
       if (user == null) {
         throw new IOException("Unable to retrieve calling user");
       }
-      List<String> auths = this.visibilityManager.getAuths(user.getShortName());
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("The list of auths are "+auths);
-      }
-      if (!auths.contains(SYSTEM_LABEL)) {
+      if (!(this.visibilityLabelService.havingSystemAuth(Bytes.toBytes(user.getShortName())))) {
         throw new AccessDeniedException("User '" + user.getShortName()
             + "' is not authorized to perform this action.");
       }
     }
   }
 
-  static class DeleteVersionVisibilityExpressionFilter extends FilterBase {
-    private List<Tag> visibilityTags;
+  private static class DeleteVersionVisibilityExpressionFilter extends FilterBase {
+    private List<Tag> deleteCellVisTags;
+    private Byte deleteCellVisTagsFormat;
 
-    public DeleteVersionVisibilityExpressionFilter(List<Tag> visibilityTags) {
-      this.visibilityTags = visibilityTags;
+    public DeleteVersionVisibilityExpressionFilter(List<Tag> deleteCellVisTags,
+        Byte deleteCellVisTagsFormat) {
+      this.deleteCellVisTags = deleteCellVisTags;
+      this.deleteCellVisTagsFormat = deleteCellVisTagsFormat;
     }
 
     @Override
-    public ReturnCode filterKeyValue(Cell kv) throws IOException {
-      boolean matchFound = VisibilityUtils.checkForMatchingVisibilityTags(kv, visibilityTags);
-      if (matchFound) {
-        return ReturnCode.INCLUDE;
-      } else {
-        return ReturnCode.SKIP;
-      }
+    public ReturnCode filterKeyValue(Cell cell) throws IOException {
+      List<Tag> putVisTags = new ArrayList<Tag>();
+      Byte putCellVisTagsFormat = VisibilityUtils.extractVisibilityTags(cell, putVisTags);
+      boolean matchFound = VisibilityLabelServiceManager
+          .getInstance().getVisibilityLabelService()
+          .matchVisibility(putVisTags, putCellVisTagsFormat, deleteCellVisTags,
+              deleteCellVisTagsFormat);
+      return matchFound ? ReturnCode.INCLUDE : ReturnCode.SKIP;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/062f6a6a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityExpEvaluator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityExpEvaluator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityExpEvaluator.java
new file mode 100644
index 0000000..ef21af4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityExpEvaluator.java
@@ -0,0 +1,42 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.visibility;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.Cell;
+
+/**
+ * During the read (ie. get/Scan) the VisibilityController calls this interface for each of the
+ * Cell. Depending on the evaluate() result, a cell can be either included or filtered out from the
+ * read results.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface VisibilityExpEvaluator {
+
+  /**
+   * Evaluates whether the passed cell passes Scan/Get Authorization.
+   * @param cell Cell under evaluation
+   * @return true if this cell can be included in the Result. Else false.
+   */
+  boolean evaluate(Cell cell) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/062f6a6a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelFilter.java
index ea579ff..a080810 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelFilter.java
@@ -18,19 +18,13 @@
 package org.apache.hadoop.hbase.security.visibility;
 
 import java.io.IOException;
-import java.util.BitSet;
-import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.filter.FilterBase;
-import org.apache.hadoop.hbase.io.util.StreamUtils;
 import org.apache.hadoop.hbase.util.ByteRange;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
 
 /**
@@ -40,15 +34,16 @@ import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
 @InterfaceAudience.Private
 class VisibilityLabelFilter extends FilterBase {
 
-  private final BitSet authLabels;
+  private final VisibilityExpEvaluator expEvaluator;
   private final Map<ByteRange, Integer> cfVsMaxVersions;
   private final ByteRange curFamily;
   private final ByteRange curQualifier;
   private int curFamilyMaxVersions;
   private int curQualMetVersions;
 
-  public VisibilityLabelFilter(BitSet authLabels, Map<ByteRange, Integer> cfVsMaxVersions) {
-    this.authLabels = authLabels;
+  public VisibilityLabelFilter(VisibilityExpEvaluator expEvaluator,
+      Map<ByteRange, Integer> cfVsMaxVersions) {
+    this.expEvaluator = expEvaluator;
     this.cfVsMaxVersions = cfVsMaxVersions;
     this.curFamily = new SimpleMutableByteRange();
     this.curQualifier = new SimpleMutableByteRange();
@@ -80,46 +75,7 @@ class VisibilityLabelFilter extends FilterBase {
       return ReturnCode.SKIP;
     }
 
-    boolean visibilityTagPresent = false;
-    // Save an object allocation where we can
-    if (cell.getTagsLength() > 0) {
-      Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
-        cell.getTagsLength());
-      while (tagsItr.hasNext()) {
-        boolean includeKV = true;
-        Tag tag = tagsItr.next();
-        if (tag.getType() == VisibilityUtils.VISIBILITY_TAG_TYPE) {
-          visibilityTagPresent = true;
-          int offset = tag.getTagOffset();
-          int endOffset = offset + tag.getTagLength();
-          while (offset < endOffset) {
-            Pair<Integer, Integer> result = StreamUtils.readRawVarint32(tag.getBuffer(), offset);
-            int currLabelOrdinal = result.getFirst();
-            if (currLabelOrdinal < 0) {
-              // check for the absence of this label in the Scan Auth labels
-              // ie. to check BitSet corresponding bit is 0
-              int temp = -currLabelOrdinal;
-              if (this.authLabels.get(temp)) {
-                includeKV = false;
-                break;
-              }
-            } else {
-              if (!this.authLabels.get(currLabelOrdinal)) {
-                includeKV = false;
-                break;
-              }
-            }
-            offset += result.getSecond();
-          }
-          if (includeKV) {
-            // We got one visibility expression getting evaluated to true. Good to include this KV in
-            // the result then.
-            return ReturnCode.INCLUDE;
-          }
-        }
-      }
-    }
-    return visibilityTagPresent ? ReturnCode.SKIP : ReturnCode.INCLUDE;
+    return this.expEvaluator.evaluate(cell) ? ReturnCode.INCLUDE : ReturnCode.SKIP;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/062f6a6a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelService.java
new file mode 100644
index 0000000..667fc46
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelService.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.visibility;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.OperationStatus;
+
+/**
+ * The interface which deals with visibility labels and user auths admin service as well as the cell
+ * visibility expression storage part and read time evaluation.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface VisibilityLabelService extends Configurable {
+
+  /**
+   * System calls this after opening of regions. Gives a chance for the VisibilityLabelService to so
+   * any initialization logic.
+   * @param e
+   *          the region coprocessor env
+   */
+  void init(RegionCoprocessorEnvironment e) throws IOException;
+
+  /**
+   * Adds the set of labels into the system.
+   * @param labels
+   *          Labels to add to the system.
+   * @return OperationStatus for each of the label addition
+   */
+  OperationStatus[] addLabels(List<byte[]> labels) throws IOException;
+
+  /**
+   * Sets given labels globally authorized for the user.
+   * @param user
+   *          The authorizing user
+   * @param authLabels
+   *          Labels which are getting authorized for the user
+   * @return OperationStatus for each of the label auth addition
+   */
+  OperationStatus[] setAuths(byte[] user, List<byte[]> authLabels) throws IOException;
+
+  /**
+   * Removes given labels from user's globally authorized list of labels.
+   * @param user
+   *          The user whose authorization to be removed
+   * @param authLabels
+   *          Labels which are getting removed from authorization set
+   * @return OperationStatus for each of the label auth removal
+   */
+  OperationStatus[] clearAuths(byte[] user, List<byte[]> authLabels) throws IOException;
+
+  /**
+   * @param user
+   *          Name of the user whose authorization to be retrieved
+   * @param systemCall
+   *          Whether a system or user originated call.
+   * @return Visibility labels authorized for the given user.
+   */
+  List<String> getAuths(byte[] user, boolean systemCall) throws IOException;
+
+  /**
+   * Creates tags corresponding to given visibility expression.
+   * <br>
+   * Note: This will be concurrently called from multiple threads and implementation should
+   * take care of thread safety.
+   * @param visExpression The Expression for which corresponding Tags to be created.
+   * @param withSerializationFormat specifies whether a tag, denoting the serialization version
+   *          of the tags, to be added in the list. When this is true make sure to add the
+   *          serialization format Tag also. The format tag value should be byte type.
+   * @param checkAuths denotes whether to check individual labels in visExpression against user's
+   *          global auth label.
+   * @return The list of tags corresponds to the visibility expression. These tags will be stored
+   *         along with the Cells.
+   */
+  List<Tag> createVisibilityExpTags(String visExpression, boolean withSerializationFormat,
+      boolean checkAuths) throws IOException;
+
+  /**
+   * Creates VisibilityExpEvaluator corresponding to given Authorizations. <br>
+   * Note: This will be concurrently called from multiple threads and implementation should take
+   * care of thread safety.
+   * @param authorizations
+   *          Authorizations for the read request
+   * @return The VisibilityExpEvaluator corresponding to the given set of authorization labels.
+   */
+  VisibilityExpEvaluator getVisibilityExpEvaluator(Authorizations authorizations)
+      throws IOException;
+
+  /**
+   * System checks for user auth during admin operations. (ie. Label add, set/clear auth). The
+   * operation is allowed only for users having system auth. Also during read, if the requesting
+   * user has system auth, he can view all the data irrespective of its labels.
+   * @param user
+   *          User for whom system auth check to be done.
+   * @return true if the given user is having system/super auth
+   */
+  boolean havingSystemAuth(byte[] user) throws IOException;
+
+  /**
+   * System uses this for deciding whether a Cell can be deleted by matching visibility expression
+   * in Delete mutation and the cell in consideration. Also system passes the serialization format
+   * of visibility tags in Put and Delete.<br>
+   * Note: This will be concurrently called from multiple threads and implementation should take
+   * care of thread safety.
+   * @param putVisTags
+   *          The visibility tags present in the Put mutation
+   * @param putVisTagFormat
+   *          The serialization format for the Put visibility tags. A <code>null</code> value for
+   *          this format means the tags are written with unsorted label ordinals
+   * @param deleteVisTags
+   *          - The visibility tags in the delete mutation (the specified Cell Visibility)
+   * @param deleteVisTagFormat
+   *          The serialization format for the Delete visibility tags. A <code>null</code> value for
+   *          this format means the tags are written with unsorted label ordinals
+   * @return true if matching tags are found
+   * @see VisibilityConstants#SORTED_ORDINAL_SERIALIZATION_FORMAT
+   */
+  boolean matchVisibility(List<Tag> putVisTags, Byte putVisTagFormat, List<Tag> deleteVisTags,
+      Byte deleteVisTagFormat) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/062f6a6a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelServiceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelServiceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelServiceManager.java
new file mode 100644
index 0000000..7ed99c3
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelServiceManager.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.visibility;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Manages singleton instance of {@link VisibilityLabelService}
+ */
+@InterfaceAudience.Private
+public class VisibilityLabelServiceManager {
+
+  private static final Log LOG = LogFactory.getLog(VisibilityLabelServiceManager.class);
+
+  public static final String VISIBILITY_LABEL_SERVICE_CLASS =
+      "hbase.regionserver.visibility.label.service.class";
+  private static final VisibilityLabelServiceManager INSTANCE = new VisibilityLabelServiceManager();
+
+  private volatile VisibilityLabelService visibilityLabelService = null;
+  private String vlsClazzName = null;
+
+  private VisibilityLabelServiceManager() {
+
+  }
+
+  public static VisibilityLabelServiceManager getInstance() {
+    return INSTANCE;
+  }
+
+  /**
+   * @param conf
+   * @return singleton instance of {@link VisibilityLabelService}. The FQCN of the implementation
+   *         class can be specified using "hbase.regionserver.visibility.label.service.class".
+   * @throws IOException When VLS implementation, as specified in conf, can not be loaded.
+   */
+  public VisibilityLabelService getVisibilityLabelService(Configuration conf) throws IOException {
+    String vlsClassName = conf.get(VISIBILITY_LABEL_SERVICE_CLASS,
+        DefaultVisibilityLabelServiceImpl.class.getCanonicalName()).trim();
+    if (this.visibilityLabelService != null) {
+      checkForClusterLevelSingleConf(vlsClassName);
+      return this.visibilityLabelService;
+    }
+    synchronized (this) {
+      if (this.visibilityLabelService != null) {
+        checkForClusterLevelSingleConf(vlsClassName);
+        return this.visibilityLabelService;
+      }
+      this.vlsClazzName = vlsClassName;
+      try {
+        this.visibilityLabelService = (VisibilityLabelService) ReflectionUtils.newInstance(
+            Class.forName(vlsClassName), conf);
+      } catch (ClassNotFoundException e) {
+        throw new IOException(e);
+      }
+      return this.visibilityLabelService;
+    }
+  }
+
+  private void checkForClusterLevelSingleConf(String vlsClassName) {
+    assert this.vlsClazzName != null;
+    if (!this.vlsClazzName.equals(vlsClassName)) {
+      LOG.warn("Trying to use table specific value for config "
+          + "'hbase.regionserver.visibility.label.service.class' which is not supported."
+          + " Will use the cluster level VisibilityLabelService class " + this.vlsClazzName);
+    }
+  }
+
+  /**
+   * @return singleton instance of {@link VisibilityLabelService}.
+   * @throws IllegalStateException if this called before initialization of singleton instance.
+   */
+  public VisibilityLabelService getVisibilityLabelService() {
+    // By the time this method is called, the singleton instance of visibilityLabelService should
+    // have been created. And it will be created as getVisibilityLabelService(Configuration conf)
+    // is called from VC#start() and that will be the 1st thing core code do with any CP.
+    if (this.visibilityLabelService == null) {
+      throw new IllegalStateException("VisibilityLabelService not yet instantiated");
+    }
+    return this.visibilityLabelService;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/062f6a6a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelsCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelsCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelsCache.java
new file mode 100644
index 0000000..731d1cd
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelsCache.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.visibility;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.MultiUserAuthorizations;
+import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.UserAuthorizations;
+import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabel;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Maintains the cache for visibility labels and also uses the zookeeper to update the labels in the
+ * system. The cache updation happens based on the data change event that happens on the zookeeper
+ * znode for labels table
+ */
+@InterfaceAudience.Private
+public class VisibilityLabelsCache {
+
+  private static final Log LOG = LogFactory.getLog(VisibilityLabelsCache.class);
+  private static final int NON_EXIST_LABEL_ORDINAL = 0;
+  private static final List<String> EMPTY_LIST = Collections.emptyList();
+  private static final Set<Integer> EMPTY_SET = Collections.emptySet();
+  private static VisibilityLabelsCache instance;
+
+  private ZKVisibilityLabelWatcher zkVisibilityWatcher;
+  private Map<String, Integer> labels = new HashMap<String, Integer>();
+  private Map<Integer, String> ordinalVsLabels = new HashMap<Integer, String>();
+  private Map<String, Set<Integer>> userAuths = new HashMap<String, Set<Integer>>();
+  /**
+   * This covers the members labels, ordinalVsLabels and userAuths
+   */
+  private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+  private VisibilityLabelsCache(ZooKeeperWatcher watcher, Configuration conf) throws IOException {
+    zkVisibilityWatcher = new ZKVisibilityLabelWatcher(watcher, this, conf);
+    try {
+      zkVisibilityWatcher.start();
+    } catch (KeeperException ke) {
+      LOG.error("ZooKeeper initialization failed", ke);
+      throw new IOException(ke);
+    }
+  }
+
+  /**
+   * Creates the singleton instance, if not yet present, and returns the same.
+   * @param watcher
+   * @param conf
+   * @return Singleton instance of VisibilityLabelsCache
+   * @throws IOException
+   */
+  public synchronized static VisibilityLabelsCache createAndGet(ZooKeeperWatcher watcher,
+      Configuration conf) throws IOException {
+    // VisibilityLabelService#init() for different regions (in same RS) passes same instance of
+    // watcher as all get the instance from RS.
+    // watcher != instance.zkVisibilityWatcher.getWatcher() - This check is needed only in UTs with
+    // RS restart. It will be same JVM in which RS restarts and instance will be not null. But the
+    // watcher associated with existing instance will be stale as the restarted RS will have new
+    // watcher with it.
+    if (instance == null || watcher != instance.zkVisibilityWatcher.getWatcher()) {
+      instance = new VisibilityLabelsCache(watcher, conf);
+    }
+    return instance;
+  }
+
+  /**
+   * @return Singleton instance of VisibilityLabelsCache
+   * @throws IllegalStateException
+   *           when this is called before calling
+   *           {@link #createAndGet(ZooKeeperWatcher, Configuration)}
+   */
+  public static VisibilityLabelsCache get() {
+    // By the time this method is called, the singleton instance of VisibilityLabelsCache should
+    // have been created.
+    if (instance == null) {
+      throw new IllegalStateException("VisibilityLabelsCache not yet instantiated");
+    }
+    return instance;
+  }
+
+  public void refreshLabelsCache(byte[] data) throws IOException {
+    List<VisibilityLabel> visibilityLabels = null;
+    try {
+      visibilityLabels = VisibilityUtils.readLabelsFromZKData(data);
+    } catch (DeserializationException dse) {
+      throw new IOException(dse);
+    }
+    this.lock.writeLock().lock();
+    try {
+      labels.clear();
+      ordinalVsLabels.clear();
+      for (VisibilityLabel visLabel : visibilityLabels) {
+        String label = Bytes.toString(visLabel.getLabel().toByteArray());
+        labels.put(label, visLabel.getOrdinal());
+        ordinalVsLabels.put(visLabel.getOrdinal(), label);
+      }
+    } finally {
+      this.lock.writeLock().unlock();
+    }
+  }
+
+  public void refreshUserAuthsCache(byte[] data) throws IOException {
+    MultiUserAuthorizations multiUserAuths = null;
+    try {
+      multiUserAuths = VisibilityUtils.readUserAuthsFromZKData(data);
+    } catch (DeserializationException dse) {
+      throw new IOException(dse);
+    }
+    this.lock.writeLock().lock();
+    try {
+      this.userAuths.clear();
+      for (UserAuthorizations userAuths : multiUserAuths.getUserAuthsList()) {
+        String user = Bytes.toString(userAuths.getUser().toByteArray());
+        this.userAuths.put(user, new HashSet<Integer>(userAuths.getAuthList()));
+      }
+    } finally {
+      this.lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * @param label Not null label string
+   * @return The ordinal for the label. The ordinal starts from 1. Returns 0 when passed a non
+   *         existing label.
+   */
+  public int getLabelOrdinal(String label) {
+    Integer ordinal = null;
+    this.lock.readLock().lock();
+    try {
+      ordinal = labels.get(label);
+    } finally {
+      this.lock.readLock().unlock();
+    }
+    if (ordinal != null) {
+      return ordinal.intValue();
+    }
+    // 0 denotes not available
+    return NON_EXIST_LABEL_ORDINAL;
+  }
+
+  /**
+   * @param ordinal The ordinal of label which we are looking for.
+   * @return The label having the given ordinal. Returns <code>null</code> when no label exist in
+   *         the system with given ordinal
+   */
+  public String getLabel(int ordinal) {
+    this.lock.readLock().lock();
+    try {
+      return this.ordinalVsLabels.get(ordinal);
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @return The total number of visibility labels.
+   */
+  public int getLabelsCount() {
+    this.lock.readLock().lock();
+    try {
+      return this.labels.size();
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+
+  public List<String> getAuths(String user) {
+    List<String> auths = EMPTY_LIST;
+    this.lock.readLock().lock();
+    try {
+      Set<Integer> authOrdinals = userAuths.get(user);
+      if (authOrdinals != null) {
+        auths = new ArrayList<String>(authOrdinals.size());
+        for (Integer authOrdinal : authOrdinals) {
+          auths.add(ordinalVsLabels.get(authOrdinal));
+        }
+      }
+    } finally {
+      this.lock.readLock().unlock();
+    }
+    return auths;
+  }
+
+  /**
+   * Returns the list of ordinals of authentications associated with the user
+   *
+   * @param user Not null value.
+   * @return the list of ordinals
+   */
+  public Set<Integer> getAuthsAsOrdinals(String user) {
+    this.lock.readLock().lock();
+    try {
+      Set<Integer> auths = userAuths.get(user);
+      return (auths == null) ? EMPTY_SET : auths;
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+
+  public void writeToZookeeper(byte[] data, boolean labelsOrUserAuths) {
+    this.zkVisibilityWatcher.writeToZookeeper(data, labelsOrUserAuths);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/062f6a6a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelsManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelsManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelsManager.java
deleted file mode 100644
index e840c64..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelsManager.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.security.visibility;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.MultiUserAuthorizations;
-import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.UserAuthorizations;
-import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabel;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * Maintains the cache for visibility labels and also uses the zookeeper to update the labels in the
- * system. The cache updation happens based on the data change event that happens on the zookeeper
- * znode for labels table
- */
-@InterfaceAudience.Private
-public class VisibilityLabelsManager {
-
-  private static final Log LOG = LogFactory.getLog(VisibilityLabelsManager.class);
-  private static final List<String> EMPTY_LIST = new ArrayList<String>(0);
-  private static VisibilityLabelsManager instance;
-
-  private ZKVisibilityLabelWatcher zkVisibilityWatcher;
-  private Map<String, Integer> labels = new HashMap<String, Integer>();
-  private Map<Integer, String> ordinalVsLabels = new HashMap<Integer, String>();
-  private Map<String, Set<Integer>> userAuths = new HashMap<String, Set<Integer>>();
-  private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-
-  private VisibilityLabelsManager(ZooKeeperWatcher watcher, Configuration conf) throws IOException {
-    zkVisibilityWatcher = new ZKVisibilityLabelWatcher(watcher, this, conf);
-    try {
-      zkVisibilityWatcher.start();
-    } catch (KeeperException ke) {
-      LOG.error("ZooKeeper initialization failed", ke);
-      throw new IOException(ke);
-    }
-  }
-
-  public synchronized static VisibilityLabelsManager get(ZooKeeperWatcher watcher,
-      Configuration conf) throws IOException {
-    if (instance == null) {
-      instance = new VisibilityLabelsManager(watcher, conf);
-    }
-    return instance;
-  }
-
-  public static VisibilityLabelsManager get() {
-    return instance;
-  }
-
-  public void refreshLabelsCache(byte[] data) throws IOException {
-    List<VisibilityLabel> visibilityLabels = null;
-    try {
-      visibilityLabels = VisibilityUtils.readLabelsFromZKData(data);
-    } catch (DeserializationException dse) {
-      throw new IOException(dse);
-    }
-    this.lock.writeLock().lock();
-    try {
-      for (VisibilityLabel visLabel : visibilityLabels) {
-        String label = Bytes.toString(visLabel.getLabel().toByteArray());
-        labels.put(label, visLabel.getOrdinal());
-        ordinalVsLabels.put(visLabel.getOrdinal(), label);
-      }
-    } finally {
-      this.lock.writeLock().unlock();
-    }
-  }
-
-  public void refreshUserAuthsCache(byte[] data) throws IOException {
-    MultiUserAuthorizations multiUserAuths = null;
-    try {
-      multiUserAuths = VisibilityUtils.readUserAuthsFromZKData(data);
-    } catch (DeserializationException dse) {
-      throw new IOException(dse);
-    }
-    this.lock.writeLock().lock();
-    try {
-      for (UserAuthorizations userAuths : multiUserAuths.getUserAuthsList()) {
-        String user = Bytes.toString(userAuths.getUser().toByteArray());
-        this.userAuths.put(user, new HashSet<Integer>(userAuths.getAuthList()));
-      }
-    } finally {
-      this.lock.writeLock().unlock();
-    }
-  }
-
-  /**
-   * @param label
-   * @return The ordinal for the label. The ordinal starts from 1. Returns 0 when the passed a non
-   *         existing label.
-   */
-  public int getLabelOrdinal(String label) {
-    Integer ordinal = null;
-    this.lock.readLock().lock();
-    try {
-      ordinal = labels.get(label);
-    } finally {
-      this.lock.readLock().unlock();
-    }
-    if (ordinal != null) {
-      return ordinal.intValue();
-    }
-    // 0 denotes not available
-    return 0;
-  }
-
-  public String getLabel(int ordinal) {
-    this.lock.readLock().lock();
-    try {
-      return this.ordinalVsLabels.get(ordinal);
-    } finally {
-      this.lock.readLock().unlock();
-    }
-  }
-
-  /**
-   * @return The total number of visibility labels.
-   */
-  public int getLabelsCount(){
-    return this.labels.size();
-  }
-
-  /**
-   * @param user
-   * @return The labels that the given user is authorized for.
-   */
-  public List<String> getAuths(String user) {
-    List<String> auths = EMPTY_LIST;
-    this.lock.readLock().lock();
-    try {
-      Set<Integer> authOrdinals = userAuths.get(user);
-      if (authOrdinals != null) {
-        auths = new ArrayList<String>(authOrdinals.size());
-        for (Integer authOrdinal : authOrdinals) {
-          auths.add(ordinalVsLabels.get(authOrdinal));
-        }
-      }
-    } finally {
-      this.lock.readLock().unlock();
-    }
-    return auths;
-  }
-
-  /**
-   * Returns the list of ordinals of authentications associated with the user
-   *
-   * @param user
-   * @return the list of ordinals
-   */
-  public Set<Integer> getAuthsAsOrdinals(String user) {
-    this.lock.readLock().lock();
-    try {
-      return userAuths.get(user);
-    } finally {
-      this.lock.readLock().unlock();
-    }
-  }
-
-  /**
-   * Writes the labels data to zookeeper node.
-   * @param data
-   * @param labelsOrUserAuths true for writing labels and false for user auths.
-   */
-  public void writeToZookeeper(byte[] data, boolean labelsOrUserAuths) {
-    this.zkVisibilityWatcher.writeToZookeeper(data, labelsOrUserAuths);
-  }
-}