You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2014/08/20 11:17:59 UTC

[3/3] git commit: HBASE-11553 Abstract visibility label related services into an interface.

HBASE-11553 Abstract visibility label related services into an interface.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/062f6a6a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/062f6a6a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/062f6a6a

Branch: refs/heads/master
Commit: 062f6a6aac99a11e3f833a6d460d458452624425
Parents: dd3c9da
Author: anoopsjohn <an...@gmail.com>
Authored: Wed Aug 20 14:47:29 2014 +0530
Committer: anoopsjohn <an...@gmail.com>
Committed: Wed Aug 20 14:47:29 2014 +0530

----------------------------------------------------------------------
 .../visibility/VisibilityConstants.java         |   6 +-
 .../hbase/zookeeper/ZooKeeperListener.java      |   9 +-
 .../java/org/apache/hadoop/hbase/TagType.java   |   2 +-
 .../hadoop/hbase/mapreduce/LabelExpander.java   |   9 +-
 .../hbase/regionserver/OperationStatus.java     |   7 +-
 .../visibility/DefaultScanLabelGenerator.java   |   6 +-
 .../DefaultVisibilityLabelServiceImpl.java      | 698 +++++++++++++++
 .../visibility/EnforcingScanLabelGenerator.java |   6 +-
 .../visibility/VisibilityController.java        | 843 +++++--------------
 .../visibility/VisibilityExpEvaluator.java      |  42 +
 .../visibility/VisibilityLabelFilter.java       |  54 +-
 .../visibility/VisibilityLabelService.java      | 142 ++++
 .../VisibilityLabelServiceManager.java          | 102 +++
 .../visibility/VisibilityLabelsCache.java       | 233 +++++
 .../visibility/VisibilityLabelsManager.java     | 198 -----
 .../visibility/VisibilityScanDeleteTracker.java | 296 ++++---
 .../security/visibility/VisibilityUtils.java    | 215 ++---
 .../visibility/ZKVisibilityLabelWatcher.java    |  10 +-
 .../ExpAsStringVisibilityLabelServiceImpl.java  | 395 +++++++++
 .../visibility/TestVisibilityLabels.java        | 190 ++---
 ...VisibilityLabelsWithCustomVisLabService.java |  79 ++
 ...ibilityLabelsWithDefaultVisLabelService.java | 142 ++++
 ...isibilityLabelsWithDistributedLogReplay.java |   4 +-
 .../TestVisibilityWithCheckAuths.java           |   4 +-
 24 files changed, 2374 insertions(+), 1318 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/062f6a6a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityConstants.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityConstants.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityConstants.java
index d91f0ef..b56f250 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityConstants.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityConstants.java
@@ -44,10 +44,10 @@ public final class VisibilityConstants {
    * Visibility serialization version format. It indicates the visibility labels
    * are sorted based on ordinal
    **/
-  public static final byte VISIBILITY_SERIALIZATION_VERSION = 1;
+  public static final byte SORTED_ORDINAL_SERIALIZATION_FORMAT = 1;
   /** Byte representation of the visibility_serialization_version **/
-  public static final byte[] SORTED_ORDINAL_SERIALIZATION_FORMAT =
-      new byte[] { VISIBILITY_SERIALIZATION_VERSION };
+  public static final byte[] SORTED_ORDINAL_SERIALIZATION_FORMAT_TAG_VAL =
+      new byte[] { SORTED_ORDINAL_SERIALIZATION_FORMAT };
 
   public static final String CHECK_AUTHS_FOR_MUTATION = 
       "hbase.security.visibility.mutations.checkauths";

http://git-wip-us.apache.org/repos/asf/hbase/blob/062f6a6a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java
index 56fe348..178b259 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java
@@ -19,8 +19,6 @@
 package org.apache.hadoop.hbase.zookeeper;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
 
 /**
  * Base class for internal listeners of ZooKeeper events.
@@ -78,4 +76,11 @@ public abstract class ZooKeeperListener {
   public void nodeChildrenChanged(String path) {
     // no-op
   }
+
+  /**
+   * @return The watcher associated with this listener
+   */
+  public ZooKeeperWatcher getWatcher() {
+    return this.watcher;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/062f6a6a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
index 68018c4..a21f7de 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
@@ -27,5 +27,5 @@ public final class TagType {
   public static final byte ACL_TAG_TYPE = (byte) 1;
   public static final byte VISIBILITY_TAG_TYPE = (byte) 2;
   public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3;
-  public static final byte VISIBILITY_EXP_SERIALIZATION_TAG_TYPE = (byte)4;
+  public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/062f6a6a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LabelExpander.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LabelExpander.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LabelExpander.java
index 60a3bc3..e4d4fe9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LabelExpander.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LabelExpander.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.client.HTable;
@@ -85,11 +86,11 @@ public class LabelExpander {
     // We will be adding this tag before the visibility tags and the presence of
     // this
     // tag indicates we are supporting deletes with cell visibility
-    tags.add(VisibilityUtils.VIS_SERIALIZATION_TAG);
+    tags.add(VisibilityUtils.SORTED_ORDINAL_SERIALIZATION_FORMAT_TAG);
     if (node.isSingleNode()) {
       getLabelOrdinals(node, labelOrdinals);
       writeLabelOrdinalsToStream(labelOrdinals, dos);
-      tags.add(new Tag(VisibilityUtils.VISIBILITY_TAG_TYPE, baos.toByteArray()));
+      tags.add(new Tag(TagType.VISIBILITY_TAG_TYPE, baos.toByteArray()));
       baos.reset();
     } else {
       NonLeafExpressionNode nlNode = (NonLeafExpressionNode) node;
@@ -97,14 +98,14 @@ public class LabelExpander {
         for (ExpressionNode child : nlNode.getChildExps()) {
           getLabelOrdinals(child, labelOrdinals);
           writeLabelOrdinalsToStream(labelOrdinals, dos);
-          tags.add(new Tag(VisibilityUtils.VISIBILITY_TAG_TYPE, baos.toByteArray()));
+          tags.add(new Tag(TagType.VISIBILITY_TAG_TYPE, baos.toByteArray()));
           baos.reset();
           labelOrdinals.clear();
         }
       } else {
         getLabelOrdinals(nlNode, labelOrdinals);
         writeLabelOrdinalsToStream(labelOrdinals, dos);
-        tags.add(new Tag(VisibilityUtils.VISIBILITY_TAG_TYPE, baos.toByteArray()));
+        tags.add(new Tag(TagType.VISIBILITY_TAG_TYPE, baos.toByteArray()));
         baos.reset();
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/062f6a6a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationStatus.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationStatus.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationStatus.java
index e78f27a..aa377c9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationStatus.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationStatus.java
@@ -56,7 +56,11 @@ public class OperationStatus {
     this.exceptionMsg = exceptionMsg;
   }
 
-  
+  public OperationStatus(OperationStatusCode code, Exception e) {
+    this.code = code;
+    this.exceptionMsg = (e == null) ? "" : e.getClass().getName() + ": " + e.getMessage();
+  }
+
   /**
    * @return OperationStatusCode
    */
@@ -70,5 +74,4 @@ public class OperationStatus {
   public String getExceptionMsg() {
     return exceptionMsg;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/062f6a6a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/DefaultScanLabelGenerator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/DefaultScanLabelGenerator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/DefaultScanLabelGenerator.java
index 5e2368b..19477b1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/DefaultScanLabelGenerator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/DefaultScanLabelGenerator.java
@@ -38,10 +38,10 @@ public class DefaultScanLabelGenerator implements ScanLabelGenerator {
   
   private Configuration conf;
   
-  private VisibilityLabelsManager labelsManager;
+  private VisibilityLabelsCache labelsCache;
   
   public DefaultScanLabelGenerator() {
-    this.labelsManager = VisibilityLabelsManager.get();
+    this.labelsCache = VisibilityLabelsCache.get();
   }
 
   @Override
@@ -59,7 +59,7 @@ public class DefaultScanLabelGenerator implements ScanLabelGenerator {
     if (authorizations != null) {
       List<String> labels = authorizations.getLabels();
       String userName = user.getShortName();
-      List<String> auths = this.labelsManager.getAuths(userName);
+      List<String> auths = this.labelsCache.getAuths(userName);
       return dropLabelsNotInUserAuths(labels, auths, userName);
     }
     return null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/062f6a6a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/DefaultVisibilityLabelServiceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/DefaultVisibilityLabelServiceImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/DefaultVisibilityLabelServiceImpl.java
new file mode 100644
index 0000000..5fef8f3
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/DefaultVisibilityLabelServiceImpl.java
@@ -0,0 +1,698 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.visibility;
+
+import static org.apache.hadoop.hbase.TagType.VISIBILITY_TAG_TYPE;
+import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_FAMILY;
+import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME;
+import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABEL_QUALIFIER;
+import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.SORTED_ORDINAL_SERIALIZATION_FORMAT;
+import static org.apache.hadoop.hbase.security.visibility.VisibilityUtils.SYSTEM_LABEL;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.OperationStatus;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.security.visibility.expression.ExpressionNode;
+import org.apache.hadoop.hbase.security.visibility.expression.LeafExpressionNode;
+import org.apache.hadoop.hbase.security.visibility.expression.NonLeafExpressionNode;
+import org.apache.hadoop.hbase.security.visibility.expression.Operator;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+
+import com.google.common.collect.Lists;
+
+@InterfaceAudience.Private
+public class DefaultVisibilityLabelServiceImpl implements VisibilityLabelService {
+
+  private static final Log LOG = LogFactory.getLog(DefaultVisibilityLabelServiceImpl.class);
+
+  // "system" label is having an ordinal value 1.
+  private static final int SYSTEM_LABEL_ORDINAL = 1;
+  private static final Tag[] LABELS_TABLE_TAGS = new Tag[1];
+  private static final byte[] DUMMY_VALUE = new byte[0];
+
+  private volatile int ordinalCounter = -1;
+  private final ExpressionParser expressionParser = new ExpressionParser();
+  private final ExpressionExpander expressionExpander = new ExpressionExpander();
+  private Configuration conf;
+  private HRegion labelsRegion;
+  private VisibilityLabelsCache labelsCache;
+  private List<ScanLabelGenerator> scanLabelGenerators;
+
+  static {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+    try {
+      StreamUtils.writeRawVInt32(dos, SYSTEM_LABEL_ORDINAL);
+    } catch (IOException e) {
+      // We write to a byte array. No Exception can happen.
+    }
+    LABELS_TABLE_TAGS[0] = new Tag(VISIBILITY_TAG_TYPE, baos.toByteArray());
+  }
+
+  public DefaultVisibilityLabelServiceImpl() {
+
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  @Override
+  public void init(RegionCoprocessorEnvironment e) throws IOException {
+    ZooKeeperWatcher zk = e.getRegionServerServices().getZooKeeper();
+    try {
+      labelsCache = VisibilityLabelsCache.createAndGet(zk, this.conf);
+    } catch (IOException ioe) {
+      LOG.error("Error creating VisibilityLabelsCache", ioe);
+      throw ioe;
+    }
+    this.scanLabelGenerators = VisibilityUtils.getScanLabelGenerators(this.conf);
+    if (e.getRegion().getRegionInfo().getTable().equals(LABELS_TABLE_NAME)) {
+      this.labelsRegion = e.getRegion();
+      Pair<Map<String, Integer>, Map<String, List<Integer>>> labelsAndUserAuths =
+          extractLabelsAndAuths(getExistingLabelsWithAuths());
+      Map<String, Integer> labels = labelsAndUserAuths.getFirst();
+      Map<String, List<Integer>> userAuths = labelsAndUserAuths.getSecond();
+      // Add the "system" label if it is not added into the system yet
+      addSystemLabel(this.labelsRegion, labels, userAuths);
+      int ordinal = SYSTEM_LABEL_ORDINAL; // Ordinal 1 is reserved for "system" label.
+      for (Integer i : labels.values()) {
+        if (i > ordinal) {
+          ordinal = i;
+        }
+      }
+      this.ordinalCounter = ordinal + 1;
+      if (labels.size() > 0) {
+        // If there is no data need not write to zk
+        byte[] serialized = VisibilityUtils.getDataToWriteToZooKeeper(labels);
+        this.labelsCache.writeToZookeeper(serialized, true);
+        this.labelsCache.refreshLabelsCache(serialized);
+      }
+      if (userAuths.size() > 0) {
+        byte[] serialized = VisibilityUtils.getUserAuthsDataToWriteToZooKeeper(userAuths);
+        this.labelsCache.writeToZookeeper(serialized, false);
+        this.labelsCache.refreshUserAuthsCache(serialized);
+      }
+    }
+  }
+
+  protected List<List<Cell>> getExistingLabelsWithAuths() throws IOException {
+    Scan scan = new Scan();
+    RegionScanner scanner = labelsRegion.getScanner(scan);
+    List<List<Cell>> existingLabels = new ArrayList<List<Cell>>();
+    try {
+      while (true) {
+        List<Cell> cells = new ArrayList<Cell>();
+        scanner.next(cells);
+        if (cells.isEmpty()) {
+          break;
+        }
+        existingLabels.add(cells);
+      }
+    } finally {
+      scanner.close();
+    }
+    return existingLabels;
+  }
+
+  protected Pair<Map<String, Integer>, Map<String, List<Integer>>> extractLabelsAndAuths(
+      List<List<Cell>> labelDetails) {
+    Map<String, Integer> labels = new HashMap<String, Integer>();
+    Map<String, List<Integer>> userAuths = new HashMap<String, List<Integer>>();
+    for (List<Cell> cells : labelDetails) {
+      for (Cell cell : cells) {
+        if (Bytes.equals(cell.getQualifierArray(), cell.getQualifierOffset(),
+            cell.getQualifierLength(), LABEL_QUALIFIER, 0, LABEL_QUALIFIER.length)) {
+          labels.put(
+              Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()),
+              Bytes.toInt(cell.getRowArray(), cell.getRowOffset()));
+        } else {
+          // These are user cells who has authorization for this label
+          String user = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
+              cell.getQualifierLength());
+          List<Integer> auths = userAuths.get(user);
+          if (auths == null) {
+            auths = new ArrayList<Integer>();
+            userAuths.put(user, auths);
+          }
+          auths.add(Bytes.toInt(cell.getRowArray(), cell.getRowOffset()));
+        }
+      }
+    }
+    return new Pair<Map<String, Integer>, Map<String, List<Integer>>>(labels, userAuths);
+  }
+
+  protected void addSystemLabel(HRegion region, Map<String, Integer> labels,
+      Map<String, List<Integer>> userAuths) throws IOException {
+    if (!labels.containsKey(SYSTEM_LABEL)) {
+      Put p = new Put(Bytes.toBytes(SYSTEM_LABEL_ORDINAL));
+      p.addImmutable(LABELS_TABLE_FAMILY, LABEL_QUALIFIER, Bytes.toBytes(SYSTEM_LABEL));
+      // Set auth for "system" label for all super users.
+      List<String> superUsers = getSystemAndSuperUsers();
+      for (String superUser : superUsers) {
+        p.addImmutable(LABELS_TABLE_FAMILY, Bytes.toBytes(superUser), DUMMY_VALUE,
+            LABELS_TABLE_TAGS);
+      }
+      region.put(p);
+      labels.put(SYSTEM_LABEL, SYSTEM_LABEL_ORDINAL);
+      for (String superUser : superUsers) {
+        List<Integer> auths = userAuths.get(superUser);
+        if (auths == null) {
+          auths = new ArrayList<Integer>(1);
+          userAuths.put(superUser, auths);
+        }
+        auths.add(SYSTEM_LABEL_ORDINAL);
+      }
+    }
+  }
+
+  protected List<String> getSystemAndSuperUsers() throws IOException {
+    User user = User.getCurrent();
+    if (user == null) {
+      throw new IOException("Unable to obtain the current user, "
+          + "authorization checks for internal operations will not work correctly!");
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Current user name is " + user.getShortName());
+    }
+    String currentUser = user.getShortName();
+    List<String> superUsers = Lists.asList(currentUser,
+        this.conf.getStrings(AccessControlLists.SUPERUSER_CONF_KEY, new String[0]));
+    return superUsers;
+  }
+
+  @Override
+  public OperationStatus[] addLabels(List<byte[]> labels) throws IOException {
+    assert labelsRegion != null;
+    OperationStatus[] finalOpStatus = new OperationStatus[labels.size()];
+    List<Mutation> puts = new ArrayList<Mutation>(labels.size());
+    int i = 0;
+    for (byte[] label : labels) {
+      String labelStr = Bytes.toString(label);
+      if (this.labelsCache.getLabelOrdinal(labelStr) > 0) {
+        finalOpStatus[i] = new OperationStatus(OperationStatusCode.FAILURE,
+            new LabelAlreadyExistsException("Label '" + labelStr + "' already exists"));
+      } else {
+        Put p = new Put(Bytes.toBytes(ordinalCounter));
+        p.addImmutable(LABELS_TABLE_FAMILY, LABEL_QUALIFIER, label, LABELS_TABLE_TAGS);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Adding the label " + labelStr);
+        }
+        puts.add(p);
+        ordinalCounter++;
+      }
+      i++;
+    }
+    if (mutateLabelsRegion(puts, finalOpStatus)) {
+      updateZk(true);
+    }
+    return finalOpStatus;
+  }
+
+  @Override
+  public OperationStatus[] setAuths(byte[] user, List<byte[]> authLabels) throws IOException {
+    assert labelsRegion != null;
+    OperationStatus[] finalOpStatus = new OperationStatus[authLabels.size()];
+    List<Mutation> puts = new ArrayList<Mutation>(authLabels.size());
+    int i = 0;
+    for (byte[] auth : authLabels) {
+      String authStr = Bytes.toString(auth);
+      int labelOrdinal = this.labelsCache.getLabelOrdinal(authStr);
+      if (labelOrdinal == 0) {
+        // This label is not yet added. 1st this should be added to the system
+        finalOpStatus[i] = new OperationStatus(OperationStatusCode.FAILURE,
+            new InvalidLabelException("Label '" + authStr + "' doesn't exists"));
+      } else {
+        Put p = new Put(Bytes.toBytes(labelOrdinal));
+        p.addImmutable(LABELS_TABLE_FAMILY, user, DUMMY_VALUE, LABELS_TABLE_TAGS);
+        puts.add(p);
+      }
+      i++;
+    }
+    if (mutateLabelsRegion(puts, finalOpStatus)) {
+      updateZk(false);
+    }
+    return finalOpStatus;
+  }
+
+  @Override
+  public OperationStatus[] clearAuths(byte[] user, List<byte[]> authLabels) throws IOException {
+    assert labelsRegion != null;
+    OperationStatus[] finalOpStatus = new OperationStatus[authLabels.size()];
+    List<String> currentAuths = this.getAuths(user, true);
+    List<Mutation> deletes = new ArrayList<Mutation>(authLabels.size());
+    int i = 0;
+    for (byte[] authLabel : authLabels) {
+      String authLabelStr = Bytes.toString(authLabel);
+      if (currentAuths.contains(authLabelStr)) {
+        int labelOrdinal = this.labelsCache.getLabelOrdinal(authLabelStr);
+        assert labelOrdinal > 0;
+        Delete d = new Delete(Bytes.toBytes(labelOrdinal));
+        d.deleteColumns(LABELS_TABLE_FAMILY, user);
+        deletes.add(d);
+      } else {
+        // This label is not set for the user.
+        finalOpStatus[i] = new OperationStatus(OperationStatusCode.FAILURE,
+            new InvalidLabelException("Label '" + authLabelStr + "' is not set for the user "
+                + Bytes.toString(user)));
+      }
+      i++;
+    }
+    if (mutateLabelsRegion(deletes, finalOpStatus)) {
+      updateZk(false);
+    }
+    return finalOpStatus;
+  }
+
+  /**
+   * Adds the mutations to labels region and set the results to the finalOpStatus. finalOpStatus
+   * might have some entries in it where the OpStatus is FAILURE. We will leave those and set in
+   * others in the order.
+   * @param mutations
+   * @param finalOpStatus
+   * @return whether we need a ZK update or not.
+   */
+  private boolean mutateLabelsRegion(List<Mutation> mutations, OperationStatus[] finalOpStatus)
+      throws IOException {
+    OperationStatus[] opStatus = this.labelsRegion.batchMutate(mutations
+        .toArray(new Mutation[mutations.size()]));
+    int i = 0;
+    boolean updateZk = false;
+    for (OperationStatus status : opStatus) {
+      // Update the zk when atleast one of the mutation was added successfully.
+      updateZk = updateZk || (status.getOperationStatusCode() == OperationStatusCode.SUCCESS);
+      for (; i < finalOpStatus.length; i++) {
+        if (finalOpStatus[i] == null) {
+          finalOpStatus[i] = status;
+          break;
+        }
+      }
+    }
+    return updateZk;
+  }
+
+  @Override
+  public List<String> getAuths(byte[] user, boolean systemCall) throws IOException {
+    assert (labelsRegion != null || systemCall);
+    if (systemCall || labelsRegion == null) {
+      return this.labelsCache.getAuths(Bytes.toString(user));
+    }
+    Scan s = new Scan();
+    s.addColumn(LABELS_TABLE_FAMILY, user);
+    Filter filter = VisibilityUtils.createVisibilityLabelFilter(this.labelsRegion,
+        new Authorizations(SYSTEM_LABEL));
+    s.setFilter(filter);
+    List<String> auths = new ArrayList<String>();
+    RegionScanner scanner = this.labelsRegion.getScanner(s);
+    List<Cell> results = new ArrayList<Cell>(1);
+    while (true) {
+      scanner.next(results);
+      if (results.isEmpty()) break;
+      Cell cell = results.get(0);
+      int ordinal = Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+      String label = this.labelsCache.getLabel(ordinal);
+      if (label != null) {
+        auths.add(label);
+      }
+      results.clear();
+    }
+    return auths;
+  }
+
+  @Override
+  public List<Tag> createVisibilityExpTags(String visExpression, boolean withSerializationFormat,
+      boolean checkAuths) throws IOException {
+    ExpressionNode node = null;
+    try {
+      node = this.expressionParser.parse(visExpression);
+    } catch (ParseException e) {
+      throw new IOException(e);
+    }
+    node = this.expressionExpander.expand(node);
+    List<Tag> tags = new ArrayList<Tag>();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+    List<Integer> labelOrdinals = new ArrayList<Integer>();
+    // We will be adding this tag before the visibility tags and the presence of this
+    // tag indicates we are supporting deletes with cell visibility
+    if (withSerializationFormat) {
+      tags.add(VisibilityUtils.SORTED_ORDINAL_SERIALIZATION_FORMAT_TAG);
+    }
+    Set<Integer> auths = null;
+    if (checkAuths) {
+      auths = this.labelsCache.getAuthsAsOrdinals(VisibilityUtils.getActiveUser().getShortName());
+    }
+    if (node.isSingleNode()) {
+      getLabelOrdinals(node, labelOrdinals, auths, checkAuths);
+      writeLabelOrdinalsToStream(labelOrdinals, dos);
+      tags.add(new Tag(VISIBILITY_TAG_TYPE, baos.toByteArray()));
+      baos.reset();
+    } else {
+      NonLeafExpressionNode nlNode = (NonLeafExpressionNode) node;
+      if (nlNode.getOperator() == Operator.OR) {
+        for (ExpressionNode child : nlNode.getChildExps()) {
+          getLabelOrdinals(child, labelOrdinals, auths, checkAuths);
+          writeLabelOrdinalsToStream(labelOrdinals, dos);
+          tags.add(new Tag(VISIBILITY_TAG_TYPE, baos.toByteArray()));
+          baos.reset();
+          labelOrdinals.clear();
+        }
+      } else {
+        getLabelOrdinals(nlNode, labelOrdinals, auths, checkAuths);
+        writeLabelOrdinalsToStream(labelOrdinals, dos);
+        tags.add(new Tag(VISIBILITY_TAG_TYPE, baos.toByteArray()));
+        baos.reset();
+      }
+    }
+    return tags;
+  }
+
+  protected void getLabelOrdinals(ExpressionNode node, List<Integer> labelOrdinals,
+      Set<Integer> auths, boolean checkAuths) throws IOException, InvalidLabelException {
+    if (node.isSingleNode()) {
+      String identifier = null;
+      int labelOrdinal = 0;
+      if (node instanceof LeafExpressionNode) {
+        identifier = ((LeafExpressionNode) node).getIdentifier();
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("The identifier is " + identifier);
+        }
+        labelOrdinal = this.labelsCache.getLabelOrdinal(identifier);
+        checkAuths(auths, labelOrdinal, identifier, checkAuths);
+      } else {
+        // This is a NOT node.
+        LeafExpressionNode lNode = (LeafExpressionNode) ((NonLeafExpressionNode) node)
+            .getChildExps().get(0);
+        identifier = lNode.getIdentifier();
+        labelOrdinal = this.labelsCache.getLabelOrdinal(identifier);
+        checkAuths(auths, labelOrdinal, identifier, checkAuths);
+        labelOrdinal = -1 * labelOrdinal; // Store NOT node as -ve ordinal.
+      }
+      if (labelOrdinal == 0) {
+        throw new InvalidLabelException("Invalid visibility label " + identifier);
+      }
+      labelOrdinals.add(labelOrdinal);
+    } else {
+      List<ExpressionNode> childExps = ((NonLeafExpressionNode) node).getChildExps();
+      for (ExpressionNode child : childExps) {
+        getLabelOrdinals(child, labelOrdinals, auths, checkAuths);
+      }
+    }
+  }
+
+  private void checkAuths(Set<Integer> auths, int labelOrdinal, String identifier,
+      boolean checkAuths) throws IOException {
+    if (checkAuths) {
+      if (auths == null || (!auths.contains(labelOrdinal))) {
+        throw new AccessDeniedException("Visibility label " + identifier
+            + " not authorized for the user " + VisibilityUtils.getActiveUser().getShortName());
+      }
+    }
+  }
+
+  /**
+   * This will sort the passed labels in ascending oder and then will write one after the other
+   * to the passed stream.
+   * @param labelOrdinals Unsorted label ordinals
+   * @param dos Stream where to write the labels.
+   * @throws IOException When IOE during writes to Stream.
+   */
+  protected void writeLabelOrdinalsToStream(List<Integer> labelOrdinals, DataOutputStream dos)
+      throws IOException {
+    Collections.sort(labelOrdinals);
+    for (Integer labelOrdinal : labelOrdinals) {
+      StreamUtils.writeRawVInt32(dos, labelOrdinal);
+    }
+  }
+
+  protected void updateZk(boolean labelAddition) throws IOException {
+    // We will add to zookeeper here.
+    // TODO we should add the delta only to zk. Else this will be a very heavy op and when there are
+    // so many labels and auth in the system, we will end up adding lots of data to zk. Most
+    // possibly we will exceed zk node data limit!
+    Pair<Map<String, Integer>, Map<String, List<Integer>>> labelsAndUserAuths =
+        extractLabelsAndAuths(getExistingLabelsWithAuths());
+    Map<String, Integer> existingLabels = labelsAndUserAuths.getFirst();
+    Map<String, List<Integer>> userAuths = labelsAndUserAuths.getSecond();
+    if (labelAddition) {
+      byte[] serialized = VisibilityUtils.getDataToWriteToZooKeeper(existingLabels);
+      this.labelsCache.writeToZookeeper(serialized, true);
+    } else {
+      byte[] serialized = VisibilityUtils.getUserAuthsDataToWriteToZooKeeper(userAuths);
+      this.labelsCache.writeToZookeeper(serialized, false);
+    }
+  }
+
+  @Override
+  public VisibilityExpEvaluator getVisibilityExpEvaluator(Authorizations authorizations)
+      throws IOException {
+    // If a super user issues a get/scan, he should be able to scan the cells
+    // irrespective of the Visibility labels
+    if (isReadFromSuperUser()) {
+      return new VisibilityExpEvaluator() {
+        @Override
+        public boolean evaluate(Cell cell) throws IOException {
+          return true;
+        }
+      };
+    }
+    List<String> authLabels = null;
+    for (ScanLabelGenerator scanLabelGenerator : scanLabelGenerators) {
+      try {
+        // null authorizations to be handled inside SLG impl.
+        authLabels = scanLabelGenerator.getLabels(VisibilityUtils.getActiveUser(), authorizations);
+        authLabels = (authLabels == null) ? new ArrayList<String>() : authLabels;
+        authorizations = new Authorizations(authLabels);
+      } catch (Throwable t) {
+        LOG.error(t);
+        throw new IOException(t);
+      }
+    }
+    int labelsCount = this.labelsCache.getLabelsCount();
+    final BitSet bs = new BitSet(labelsCount + 1); // ordinal is index 1 based
+    if (authLabels != null) {
+      for (String authLabel : authLabels) {
+        int labelOrdinal = this.labelsCache.getLabelOrdinal(authLabel);
+        if (labelOrdinal != 0) {
+          bs.set(labelOrdinal);
+        }
+      }
+    }
+
+    return new VisibilityExpEvaluator() {
+      @Override
+      public boolean evaluate(Cell cell) throws IOException {
+        boolean visibilityTagPresent = false;
+        // Save an object allocation where we can
+        if (cell.getTagsLength() > 0) {
+          Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
+              cell.getTagsLength());
+          while (tagsItr.hasNext()) {
+            boolean includeKV = true;
+            Tag tag = tagsItr.next();
+            if (tag.getType() == VISIBILITY_TAG_TYPE) {
+              visibilityTagPresent = true;
+              int offset = tag.getTagOffset();
+              int endOffset = offset + tag.getTagLength();
+              while (offset < endOffset) {
+                Pair<Integer, Integer> result = StreamUtils
+                    .readRawVarint32(tag.getBuffer(), offset);
+                int currLabelOrdinal = result.getFirst();
+                if (currLabelOrdinal < 0) {
+                  // check for the absence of this label in the Scan Auth labels
+                  // ie. to check BitSet corresponding bit is 0
+                  int temp = -currLabelOrdinal;
+                  if (bs.get(temp)) {
+                    includeKV = false;
+                    break;
+                  }
+                } else {
+                  if (!bs.get(currLabelOrdinal)) {
+                    includeKV = false;
+                    break;
+                  }
+                }
+                offset += result.getSecond();
+              }
+              if (includeKV) {
+                // We got one visibility expression getting evaluated to true. Good to include this
+                // KV in the result then.
+                return true;
+              }
+            }
+          }
+        }
+        return !(visibilityTagPresent);
+      }
+    };
+  }
+
+  protected boolean isReadFromSuperUser() throws IOException {
+    byte[] user = Bytes.toBytes(VisibilityUtils.getActiveUser().getShortName());
+    return havingSystemAuth(user);
+  }
+
+  @Override
+  public boolean havingSystemAuth(byte[] user) throws IOException {
+    List<String> auths = this.getAuths(user, true);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("The auths for user " + Bytes.toString(user) + " are " + auths);
+    }
+    return auths.contains(SYSTEM_LABEL);
+  }
+
+  @Override
+  public boolean matchVisibility(List<Tag> putVisTags, Byte putTagsFormat, List<Tag> deleteVisTags,
+      Byte deleteTagsFormat) throws IOException {
+    if ((deleteTagsFormat != null && deleteTagsFormat == SORTED_ORDINAL_SERIALIZATION_FORMAT)
+        && (putTagsFormat == null || putTagsFormat == SORTED_ORDINAL_SERIALIZATION_FORMAT)) {
+      if (putVisTags.size() == 0) {
+        // Early out if there are no tags in the cell
+        return false;
+      }
+      if (putTagsFormat == null) {
+        return matchUnSortedVisibilityTags(putVisTags, deleteVisTags);
+      } else {
+        return matchOrdinalSortedVisibilityTags(putVisTags, deleteVisTags);
+      }
+    }
+    throw new IOException("Unexpected tag format passed for comparison, deleteTagsFormat : "
+        + deleteTagsFormat + ", putTagsFormat : " + putTagsFormat);
+  }
+
+  /**
+   * @param putVisTags Visibility tags in Put Mutation
+   * @param deleteVisTags Visibility tags in Delete Mutation
+   * @return true when all the visibility tags in Put matches with visibility tags in Delete.
+   * This is used when, at least one set of tags are not sorted based on the label ordinal.
+   */
+  private static boolean matchUnSortedVisibilityTags(List<Tag> putVisTags,
+      List<Tag> deleteVisTags) throws IOException {
+    return compareTagsOrdinals(sortTagsBasedOnOrdinal(putVisTags),
+        sortTagsBasedOnOrdinal(deleteVisTags));
+  }
+
+  /**
+   * @param putVisTags Visibility tags in Put Mutation
+   * @param deleteVisTags Visibility tags in Delete Mutation
+   * @return true when all the visibility tags in Put matches with visibility tags in Delete.
+   * This is used when both the set of tags are sorted based on the label ordinal.
+   */
+  private static boolean matchOrdinalSortedVisibilityTags(List<Tag> putVisTags,
+      List<Tag> deleteVisTags) {
+    boolean matchFound = false;
+    // If the size does not match. Definitely we are not comparing the equal tags.
+    if ((deleteVisTags.size()) == putVisTags.size()) {
+      for (Tag tag : deleteVisTags) {
+        matchFound = false;
+        for (Tag givenTag : putVisTags) {
+          if (Bytes.equals(tag.getBuffer(), tag.getTagOffset(), tag.getTagLength(),
+              givenTag.getBuffer(), givenTag.getTagOffset(), givenTag.getTagLength())) {
+            matchFound = true;
+            break;
+          }
+        }
+        if (!matchFound) break;
+      }
+    }
+    return matchFound;
+  }
+
+  private static List<List<Integer>> sortTagsBasedOnOrdinal(List<Tag> tags) throws IOException {
+    List<List<Integer>> fullTagsList = new ArrayList<List<Integer>>();
+    for (Tag tag : tags) {
+      if (tag.getType() == VISIBILITY_TAG_TYPE) {
+        getSortedTagOrdinals(fullTagsList, tag);
+      }
+    }
+    return fullTagsList;
+  }
+
+  private static void getSortedTagOrdinals(List<List<Integer>> fullTagsList, Tag tag)
+      throws IOException {
+    List<Integer> tagsOrdinalInSortedOrder = new ArrayList<Integer>();
+    int offset = tag.getTagOffset();
+    int endOffset = offset + tag.getTagLength();
+    while (offset < endOffset) {
+      Pair<Integer, Integer> result = StreamUtils.readRawVarint32(tag.getBuffer(), offset);
+      tagsOrdinalInSortedOrder.add(result.getFirst());
+      offset += result.getSecond();
+    }
+    Collections.sort(tagsOrdinalInSortedOrder);
+    fullTagsList.add(tagsOrdinalInSortedOrder);
+  }
+
+  /*
+   * @return true when all the visibility tags in Put matches with visibility tags in Delete.
+   */
+  private static boolean compareTagsOrdinals(List<List<Integer>> putVisTags,
+      List<List<Integer>> deleteVisTags) {
+    boolean matchFound = false;
+    if (deleteVisTags.size() == putVisTags.size()) {
+      for (List<Integer> deleteTagOrdinals : deleteVisTags) {
+        matchFound = false;
+        for (List<Integer> tagOrdinals : putVisTags) {
+          if (deleteTagOrdinals.equals(tagOrdinals)) {
+            matchFound = true;
+            break;
+          }
+        }
+        if (!matchFound) break;
+      }
+    }
+    return matchFound;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/062f6a6a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/EnforcingScanLabelGenerator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/EnforcingScanLabelGenerator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/EnforcingScanLabelGenerator.java
index 7d0320a..90a8ff9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/EnforcingScanLabelGenerator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/EnforcingScanLabelGenerator.java
@@ -37,10 +37,10 @@ public class EnforcingScanLabelGenerator implements ScanLabelGenerator {
   private static final Log LOG = LogFactory.getLog(EnforcingScanLabelGenerator.class);
 
   private Configuration conf;
-  private VisibilityLabelsManager labelsManager;
+  private VisibilityLabelsCache labelsCache;
 
   public EnforcingScanLabelGenerator() {
-    this.labelsManager = VisibilityLabelsManager.get();
+    this.labelsCache = VisibilityLabelsCache.get();
   }
 
   @Override
@@ -59,7 +59,7 @@ public class EnforcingScanLabelGenerator implements ScanLabelGenerator {
     if (authorizations != null) {
       LOG.warn("Dropping authorizations requested by user " + userName + ": " + authorizations);
     }
-    return this.labelsManager.getAuths(userName);
+    return this.labelsCache.getAuths(userName);
   }
 
 }