You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/04/28 03:35:05 UTC

[2/7] kylin git commit: KYLIN-2535 AclService and UserService store records via ResourceStore interface

KYLIN-2535 AclService and UserService store records via ResourceStore interface

Signed-off-by: Hongbin Ma <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/95c6d4fa
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/95c6d4fa
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/95c6d4fa

Branch: refs/heads/master
Commit: 95c6d4fa3dcdca737185a9e84dbb98f13ca5dd8f
Parents: ff3e095
Author: xiefan46 <95...@qq.com>
Authored: Fri Apr 7 15:52:44 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Fri Apr 28 11:33:26 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/rest/constant/Constant.java    |   2 +
 .../kylin/rest/security/AclEntityFactory.java   |  12 +-
 .../kylin/rest/security/AclEntityType.java      |  31 ++
 .../kylin/rest/security/AclHBaseStorage.java    |   1 +
 .../kylin/rest/security/AclPermissionType.java  |  37 ++
 .../rest/security/MockAclHBaseStorage.java      |  13 +-
 .../rest/security/RealAclHBaseStorage.java      |  13 +-
 .../apache/kylin/rest/service/AclService.java   | 470 ++++++++++---------
 .../kylin/rest/service/AclServiceOld.java       | 460 ++++++++++++++++++
 .../rest/service/AclTableMigrationTool.java     |  78 +++
 .../apache/kylin/rest/service/UserService.java  | 347 +++++++-------
 .../kylin/rest/service/UserServiceOld.java      | 286 +++++++++++
 .../rest/controller/AccessControllerTest.java   | 152 +++++-
 .../kylin/rest/service/AccessServiceTest.java   |   4 +-
 .../kylin/rest/service/UserServiceTest.java     |   1 +
 15 files changed, 1501 insertions(+), 406 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/95c6d4fa/server-base/src/main/java/org/apache/kylin/rest/constant/Constant.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/constant/Constant.java b/server-base/src/main/java/org/apache/kylin/rest/constant/Constant.java
index f068e5f..eff7b4f 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/constant/Constant.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/constant/Constant.java
@@ -43,4 +43,6 @@ public class Constant {
     public final static String SERVER_MODE_JOB = "job";
     public final static String SERVER_MODE_ALL = "all";
 
+
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/95c6d4fa/server-base/src/main/java/org/apache/kylin/rest/security/AclEntityFactory.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/AclEntityFactory.java b/server-base/src/main/java/org/apache/kylin/rest/security/AclEntityFactory.java
index 621fbe4..5744bfd 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/security/AclEntityFactory.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/security/AclEntityFactory.java
@@ -26,33 +26,33 @@ import org.apache.kylin.metadata.project.ProjectInstance;
 
 /**
  * @author xduo
- * 
  */
-public class AclEntityFactory {
+public class AclEntityFactory implements AclEntityType{
+
 
     public static RootPersistentEntity createAclEntity(String entityType, String uuid) {
-        if ("CubeInstance".equals(entityType)) {
+        if (CUBE_INSTANCE.equals(entityType)) {
             CubeInstance cubeInstance = new CubeInstance();
             cubeInstance.setUuid(uuid);
 
             return cubeInstance;
         }
 
-        if ("DataModelDesc".equals(entityType)) {
+        if (DATA_MODEL_DESC.equals(entityType)) {
             DataModelDesc modelInstance = new DataModelDesc();
             modelInstance.setUuid(uuid);
 
             return modelInstance;
         }
 
-        if ("JobInstance".equals(entityType)) {
+        if (JOB_INSTANCE.equals(entityType)) {
             JobInstance jobInstance = new JobInstance();
             jobInstance.setUuid(uuid);
 
             return jobInstance;
         }
 
-        if ("ProjectInstance".equals(entityType)) {
+        if (PROJECT_INSTANCE.equals(entityType)) {
             ProjectInstance projectInstance = new ProjectInstance();
             projectInstance.setUuid(uuid);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/95c6d4fa/server-base/src/main/java/org/apache/kylin/rest/security/AclEntityType.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/AclEntityType.java b/server-base/src/main/java/org/apache/kylin/rest/security/AclEntityType.java
new file mode 100644
index 0000000..69965f8
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/security/AclEntityType.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.security;
+
+/**
+ * Created by xiefan on 17-4-14.
+ */
+public interface AclEntityType {
+    static final String CUBE_INSTANCE = "CubeInstance";
+
+    static final String DATA_MODEL_DESC = "DataModelDesc";
+
+    static final String JOB_INSTANCE = "JobInstance";
+
+    static final String PROJECT_INSTANCE = "ProjectInstance";
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/95c6d4fa/server-base/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java b/server-base/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java
index 8095bf8..b595c72 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.Table;
 
 /**
  */
+@Deprecated  //use ResourceStore interface instead.
 public interface AclHBaseStorage {
 
     String ACL_INFO_FAMILY = "i";

http://git-wip-us.apache.org/repos/asf/kylin/blob/95c6d4fa/server-base/src/main/java/org/apache/kylin/rest/security/AclPermissionType.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/AclPermissionType.java b/server-base/src/main/java/org/apache/kylin/rest/security/AclPermissionType.java
new file mode 100644
index 0000000..45ba4b1
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/security/AclPermissionType.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.security;
+
+/**
+ * Created by xiefan on 17-4-14.
+ */
+public interface AclPermissionType {
+    static final String MANAGEMENT = "MANAGEMENT";
+
+    static final String OPERATION = "OPERATION";
+
+    static final String READ = "READ";
+
+    static final String WRITE = "WRITE";
+
+    static final String CREATE = "CREATE";
+
+    static final String DELETE = "DELETE";
+
+    static final String ADMINISTRATION = "ADMINISTRATION";
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/95c6d4fa/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java b/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java
index cc76b87..5ebb371 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java
@@ -18,17 +18,18 @@
 
 package org.apache.kylin.rest.security;
 
-import java.io.IOException;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.rest.service.AclService;
+import org.apache.kylin.rest.service.AclServiceOld;
 import org.apache.kylin.rest.service.QueryService;
-import org.apache.kylin.rest.service.UserService;
+import org.apache.kylin.rest.service.UserServiceOld;
+
+import java.io.IOException;
 
 /**
  */
+@Deprecated
 public class MockAclHBaseStorage implements AclHBaseStorage {
 
     private static final String aclTableName = "MOCK-ACL-TABLE";
@@ -53,10 +54,10 @@ public class MockAclHBaseStorage implements AclHBaseStorage {
             return realAcl.prepareHBaseTable(clazz);
         }
 
-        if (clazz == AclService.class) {
+        if (clazz == AclServiceOld.class) {
             mockedAclTable = new MockHTable(aclTableName, ACL_INFO_FAMILY, ACL_ACES_FAMILY);
             return aclTableName;
-        } else if (clazz == UserService.class) {
+        } else if (clazz == UserServiceOld.class) {
             mockedUserTable = new MockHTable(userTableName, USER_AUTHORITY_FAMILY, QueryService.USER_QUERY_FAMILY);
             return userTableName;
         } else {

http://git-wip-us.apache.org/repos/asf/kylin/blob/95c6d4fa/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java b/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java
index d1a1384..657fbbc 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java
@@ -18,19 +18,20 @@
 
 package org.apache.kylin.rest.security;
 
-import java.io.IOException;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.rest.service.AclService;
+import org.apache.kylin.rest.service.AclServiceOld;
 import org.apache.kylin.rest.service.QueryService;
-import org.apache.kylin.rest.service.UserService;
+import org.apache.kylin.rest.service.UserServiceOld;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 
+import java.io.IOException;
+
 /**
  */
+@Deprecated
 public class RealAclHBaseStorage implements AclHBaseStorage {
 
     private String hbaseUrl;
@@ -45,11 +46,11 @@ public class RealAclHBaseStorage implements AclHBaseStorage {
         hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1);
         String tableNameBase = kylinConfig.getMetadataUrlPrefix();
 
-        if (clazz == AclService.class) {
+        if (clazz == AclServiceOld.class) {
             aclTableName = tableNameBase + ACL_TABLE_NAME;
             HBaseConnection.createHTableIfNeeded(hbaseUrl, aclTableName, ACL_INFO_FAMILY, ACL_ACES_FAMILY);
             return aclTableName;
-        } else if (clazz == UserService.class) {
+        } else if (clazz == UserServiceOld.class) {
             userTableName = tableNameBase + USER_TABLE_NAME;
             HBaseConnection.createHTableIfNeeded(hbaseUrl, userTableName, USER_AUTHORITY_FAMILY, QueryService.USER_QUERY_FAMILY);
             return userTableName;

http://git-wip-us.apache.org/repos/asf/kylin/blob/95c6d4fa/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java b/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java
index b80d97d..5224efa 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java
@@ -18,31 +18,14 @@
 
 package org.apache.kylin.rest.service;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-
-import javax.annotation.PostConstruct;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.rest.security.AclHBaseStorage;
-import org.apache.kylin.rest.util.Serializer;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.util.JsonUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -71,30 +54,34 @@ import org.springframework.security.util.FieldUtils;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonMappingException;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
+
+<<<<<<< HEAD
 /**
  * @author xduo
  */
+=======
+>>>>>>> 62cdc0f... KYLIN-2535 AclService and UserService store records via ResourceStore interface
 @Component("aclService")
 public class AclService implements MutableAclService {
 
     private static final Logger logger = LoggerFactory.getLogger(AclService.class);
 
-    private static String ACL_INFO_FAMILY_TYPE_COLUMN = "t";
-    private static String ACL_INFO_FAMILY_OWNER_COLUMN = "o";
-    private static String ACL_INFO_FAMILY_PARENT_COLUMN = "p";
-    private static String ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN = "i";
+    private final Field fieldAces = FieldUtils.getField(AclImpl.class, "aces");
 
-    private Serializer<SidInfo> sidSerializer = new Serializer<SidInfo>(SidInfo.class);
-    private Serializer<DomainObjectInfo> domainObjSerializer = new Serializer<DomainObjectInfo>(DomainObjectInfo.class);
-    private Serializer<AceInfo> aceSerializer = new Serializer<AceInfo>(AceInfo.class);
+    private final Field fieldAcl = FieldUtils.getField(AccessControlEntryImpl.class, "acl");
 
-    private String aclTableName = null;
+    private static final String DIR_PREFIX = "/acl/";
 
-    private final Field fieldAces = FieldUtils.getField(AclImpl.class, "aces");
-    private final Field fieldAcl = FieldUtils.getField(AccessControlEntryImpl.class, "acl");
 
     @Autowired
     protected PermissionGrantingStrategy permissionGrantingStrategy;
@@ -108,8 +95,7 @@ public class AclService implements MutableAclService {
     @Autowired
     protected AuditLogger auditLogger;
 
-    @Autowired
-    protected AclHBaseStorage aclHBaseStorage;
+    protected ResourceStore aclStore;
 
     @Autowired
     protected UserService userService;
@@ -117,46 +103,32 @@ public class AclService implements MutableAclService {
     public AclService() throws IOException {
         fieldAces.setAccessible(true);
         fieldAcl.setAccessible(true);
-    }
-
-    @PostConstruct
-    public void init() throws IOException {
-        aclTableName = aclHBaseStorage.prepareHBaseTable(AclService.class);
+        aclStore = ResourceStore.getStore(KylinConfig.getInstanceFromEnv());
+        logger.debug("Acl service2 create");
     }
 
     @Override
     public List<ObjectIdentity> findChildren(ObjectIdentity parentIdentity) {
+        logger.debug("invoke method : findChildren");
         List<ObjectIdentity> oids = new ArrayList<ObjectIdentity>();
-        Table htable = null;
         try {
-            htable = aclHBaseStorage.getTable(aclTableName);
-
-            Scan scan = new Scan();
-            SingleColumnValueFilter parentFilter = new SingleColumnValueFilter(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), CompareOp.EQUAL, domainObjSerializer.serialize(new DomainObjectInfo(parentIdentity)));
-            parentFilter.setFilterIfMissing(true);
-            scan.setFilter(parentFilter);
-
-            ResultScanner scanner = htable.getScanner(scan);
-            for (Result result = scanner.next(); result != null; result = scanner.next()) {
-                String id = Bytes.toString(result.getRow());
-                String type = Bytes.toString(result.getValue(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN)));
-
-                oids.add(new ObjectIdentityImpl(type, id));
+            List<AclRecord> allAclRecords = aclStore.getAllResources(String.valueOf(DIR_PREFIX), AclRecord.class, AclRecordSerializer.getInstance());
+            for (AclRecord record : allAclRecords) {
+                DomainObjectInfo parent = record.getParentDomainObjectInfo();
+                if (parent != null && parent.getId().equals(String.valueOf(parentIdentity.getIdentifier()))) {
+                    DomainObjectInfo child = record.getDomainObjectInfo();
+                    oids.add(new ObjectIdentityImpl(child.getType(), child.getId()));
+                }
             }
+            return oids;
         } catch (IOException e) {
             throw new RuntimeException(e.getMessage(), e);
-        } finally {
-            IOUtils.closeQuietly(htable);
         }
-
-        return oids;
     }
 
     @Override
     public Acl readAclById(ObjectIdentity object) throws NotFoundException {
         Map<ObjectIdentity, Acl> aclsMap = readAclsById(Arrays.asList(object), null);
-        //        Assert.isTrue(aclsMap.containsKey(object), "There should have been an Acl entry for ObjectIdentity " + object);
-
         return aclsMap.get(object);
     }
 
@@ -164,7 +136,6 @@ public class AclService implements MutableAclService {
     public Acl readAclById(ObjectIdentity object, List<Sid> sids) throws NotFoundException {
         Map<ObjectIdentity, Acl> aclsMap = readAclsById(Arrays.asList(object), sids);
         Assert.isTrue(aclsMap.containsKey(object), "There should have been an Acl entry for ObjectIdentity " + object);
-
         return aclsMap.get(object);
     }
 
@@ -175,46 +146,40 @@ public class AclService implements MutableAclService {
 
     @Override
     public Map<ObjectIdentity, Acl> readAclsById(List<ObjectIdentity> oids, List<Sid> sids) throws NotFoundException {
+        logger.debug("invoke method : readAclsById");
         Map<ObjectIdentity, Acl> aclMaps = new HashMap<ObjectIdentity, Acl>();
-        Table htable = null;
-        Result result = null;
         try {
-            htable = aclHBaseStorage.getTable(aclTableName);
-
             for (ObjectIdentity oid : oids) {
-                result = htable.get(new Get(Bytes.toBytes(String.valueOf(oid.getIdentifier()))));
-
-                if (null != result && !result.isEmpty()) {
-                    SidInfo owner = sidSerializer.deserialize(result.getValue(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN)));
+                AclRecord record = aclStore.getResource(getQueryKeyById(String.valueOf(oid.getIdentifier())), AclRecord.class, AclRecordSerializer.getInstance());
+                if (record != null) {
+                    SidInfo owner = record.getOwnerInfo();
                     Sid ownerSid = (null == owner) ? null : (owner.isPrincipal() ? new PrincipalSid(owner.getSid()) : new GrantedAuthoritySid(owner.getSid()));
-                    boolean entriesInheriting = Bytes.toBoolean(result.getValue(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN)));
+                    boolean entriesInheriting = record.isEntriesInheriting();
 
                     Acl parentAcl = null;
-                    DomainObjectInfo parentInfo = domainObjSerializer.deserialize(result.getValue(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN)));
-                    if (null != parentInfo) {
-                        ObjectIdentity parentObj = new ObjectIdentityImpl(parentInfo.getType(), parentInfo.getId());
-                        parentAcl = readAclById(parentObj, null);
+                    DomainObjectInfo parent = record.getParentDomainObjectInfo();
+                    if (parent != null) {
+                        ObjectIdentity parentObject = new ObjectIdentityImpl(parent.getType(), parent.getId());
+                        parentAcl = readAclById(parentObject, null);
                     }
 
                     AclImpl acl = new AclImpl(oid, oid.getIdentifier(), aclAuthorizationStrategy, permissionGrantingStrategy, parentAcl, null, entriesInheriting, ownerSid);
-                    genAces(sids, result, acl);
+                    genAces(sids, record, acl);
 
                     aclMaps.put(oid, acl);
                 } else {
                     throw new NotFoundException("Unable to find ACL information for object identity '" + oid + "'");
                 }
             }
+            return aclMaps;
         } catch (IOException e) {
-            throw new RuntimeException(e.getMessage(), e);
-        } finally {
-            IOUtils.closeQuietly(htable);
+            throw new NotFoundException(e.getMessage(), e);
         }
-
-        return aclMaps;
     }
 
     @Override
     public MutableAcl createAcl(ObjectIdentity objectIdentity) throws AlreadyExistsException {
+        logger.debug("invoke method : createAcl");
         Acl acl = null;
 
         try {
@@ -225,79 +190,54 @@ public class AclService implements MutableAclService {
         if (null != acl) {
             throw new AlreadyExistsException("ACL of " + objectIdentity + " exists!");
         }
-
         Authentication auth = SecurityContextHolder.getContext().getAuthentication();
         PrincipalSid sid = new PrincipalSid(auth);
-
-        Table htable = null;
         try {
-            htable = aclHBaseStorage.getTable(aclTableName);
-
-            Put put = new Put(Bytes.toBytes(String.valueOf(objectIdentity.getIdentifier())));
-            put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN), Bytes.toBytes(objectIdentity.getType()));
-            put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN), sidSerializer.serialize(new SidInfo(sid)));
-            put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN), Bytes.toBytes(true));
-
-            htable.put(put);
-
+            AclRecord record = new AclRecord(new DomainObjectInfo(objectIdentity), null, new SidInfo(sid), true, null);
+            aclStore.putResource(getQueryKeyById(String.valueOf(objectIdentity.getIdentifier())), record, 0, AclRecordSerializer.getInstance());
             logger.debug("ACL of " + objectIdentity + " created successfully.");
         } catch (IOException e) {
             throw new RuntimeException(e.getMessage(), e);
-        } finally {
-            IOUtils.closeQuietly(htable);
         }
-
         return (MutableAcl) readAclById(objectIdentity);
     }
 
     @Override
     public void deleteAcl(ObjectIdentity objectIdentity, boolean deleteChildren) throws ChildrenExistException {
-        Table htable = null;
         try {
-            htable = aclHBaseStorage.getTable(aclTableName);
-
-            Delete delete = new Delete(Bytes.toBytes(String.valueOf(objectIdentity.getIdentifier())));
-
+            logger.debug("invoke method : deleteAcl");
             List<ObjectIdentity> children = findChildren(objectIdentity);
             if (!deleteChildren && children.size() > 0) {
                 throw new ChildrenExistException("Children exists for " + objectIdentity);
             }
-
             for (ObjectIdentity oid : children) {
                 deleteAcl(oid, deleteChildren);
             }
-
-            htable.delete(delete);
-
+            aclStore.deleteResource(getQueryKeyById(String.valueOf(objectIdentity.getIdentifier())));
             logger.debug("ACL of " + objectIdentity + " deleted successfully.");
         } catch (IOException e) {
             throw new RuntimeException(e.getMessage(), e);
-        } finally {
-            IOUtils.closeQuietly(htable);
         }
     }
 
     @Override
-    public MutableAcl updateAcl(MutableAcl acl) throws NotFoundException {
+    public MutableAcl updateAcl(MutableAcl mutableAcl) throws NotFoundException {
         try {
-            readAclById(acl.getObjectIdentity());
+            logger.debug("invoke method : updateAcl");
+            readAclById(mutableAcl.getObjectIdentity());
         } catch (NotFoundException e) {
             throw e;
         }
 
-        Table htable = null;
         try {
-            htable = aclHBaseStorage.getTable(aclTableName);
-
-            Delete delete = new Delete(Bytes.toBytes(String.valueOf(acl.getObjectIdentity().getIdentifier())));
-            delete.deleteFamily(Bytes.toBytes(AclHBaseStorage.ACL_ACES_FAMILY));
-            htable.delete(delete);
-
-            Put put = new Put(Bytes.toBytes(String.valueOf(acl.getObjectIdentity().getIdentifier())));
-
-            if (null != acl.getParentAcl()) {
-                put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), domainObjSerializer.serialize(new DomainObjectInfo(acl.getParentAcl().getObjectIdentity())));
+            String id = getQueryKeyById(String.valueOf(mutableAcl.getObjectIdentity().getIdentifier()));
+            AclRecord record = aclStore.getResource(id, AclRecord.class, AclRecordSerializer.getInstance());
+            aclStore.deleteResource(id);
+            //logger.debug("Exist? {}", aclStore.exists(id));
+            if (mutableAcl.getParentAcl() != null) {
+                record.setParentDomainObjectInfo(new DomainObjectInfo(mutableAcl.getParentAcl().getObjectIdentity()));
             }
+<<<<<<< HEAD
 
             for (AccessControlEntry ace : acl.getEntries()) {
                 if (ace.getSid() instanceof PrincipalSid) {
@@ -309,26 +249,34 @@ public class AclService implements MutableAclService {
                 }
                 AceInfo aceInfo = new AceInfo(ace);
                 put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_ACES_FAMILY), Bytes.toBytes(aceInfo.getSidInfo().getSid()), aceSerializer.serialize(aceInfo));
+=======
+            if (record.getAllAceInfo() == null) {
+                record.setAllAceInfo(new HashMap<String, AceInfo>());
+>>>>>>> 62cdc0f... KYLIN-2535 AclService and UserService store records via ResourceStore interface
             }
-
-            if (!put.isEmpty()) {
-                htable.put(put);
-
-                logger.debug("ACL of " + acl.getObjectIdentity() + " updated successfully.");
+            Map<String, AceInfo> allAceInfo = record.getAllAceInfo();
+            allAceInfo.clear();
+            for (AccessControlEntry ace : mutableAcl.getEntries()) {
+                AceInfo aceInfo = new AceInfo(ace);
+                allAceInfo.put(String.valueOf(aceInfo.getSidInfo().getSid()), aceInfo);
             }
+            aclStore.putResource(id, record, 0, AclRecordSerializer.getInstance());
+            logger.debug("ACL of " + mutableAcl.getObjectIdentity() + " updated successfully.");
         } catch (IOException e) {
             throw new RuntimeException(e.getMessage(), e);
-        } finally {
-            IOUtils.closeQuietly(htable);
         }
-
-        return (MutableAcl) readAclById(acl.getObjectIdentity());
+        return (MutableAcl) readAclById(mutableAcl.getObjectIdentity());
     }
 
+<<<<<<< HEAD
 
     private void genAces(List<Sid> sids, Result result, AclImpl acl) throws JsonParseException, JsonMappingException, IOException {
+=======
+    private void genAces(List<Sid> sids, AclRecord record, AclImpl acl) throws JsonParseException, JsonMappingException, IOException {
+>>>>>>> 62cdc0f... KYLIN-2535 AclService and UserService store records via ResourceStore interface
         List<AceInfo> aceInfos = new ArrayList<AceInfo>();
-        if (null != sids) {
+        Map<String, AceInfo> allAceInfos = record.getAllAceInfo();
+        if (sids != null) {
             // Just return aces in sids
             for (Sid sid : sids) {
                 String sidName = null;
@@ -337,21 +285,14 @@ public class AclService implements MutableAclService {
                 } else if (sid instanceof GrantedAuthoritySid) {
                     sidName = ((GrantedAuthoritySid) sid).getGrantedAuthority();
                 }
-
-                AceInfo aceInfo = aceSerializer.deserialize(result.getValue(Bytes.toBytes(AclHBaseStorage.ACL_ACES_FAMILY), Bytes.toBytes(sidName)));
-                if (null != aceInfo) {
+                AceInfo aceInfo = allAceInfos.get(sidName);
+                if (aceInfo != null) {
                     aceInfos.add(aceInfo);
                 }
             }
         } else {
-            NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(Bytes.toBytes(AclHBaseStorage.ACL_ACES_FAMILY));
-            for (byte[] qualifier : familyMap.keySet()) {
-                AceInfo aceInfo = aceSerializer.deserialize(familyMap.get(qualifier));
-
-                if (null != aceInfo) {
-                    aceInfos.add(aceInfo);
-                }
-            }
+            if (allAceInfos != null)
+                aceInfos.addAll(allAceInfos.values());
         }
 
         List<AccessControlEntry> newAces = new ArrayList<AccessControlEntry>();
@@ -376,99 +317,210 @@ public class AclService implements MutableAclService {
         }
     }
 
-    protected static class DomainObjectInfo {
-        private String id;
-        private String type;
+    private String getQueryKeyById(String id) {
+        return DIR_PREFIX + id;
+    }
 
-        public DomainObjectInfo() {
-        }
+    private boolean equal(ObjectIdentity o1, ObjectIdentity o2) {
+        if (o1.getIdentifier().equals(o2.getIdentifier()) && o1.getType().equals(o2.getType()))
+            return true;
+        return false;
+    }
 
-        public DomainObjectInfo(ObjectIdentity oid) {
-            super();
-            this.id = (String) oid.getIdentifier();
-            this.type = oid.getType();
-        }
+    protected static class AclRecordSerializer implements Serializer<AclRecord> {
+
+        private static final AclRecordSerializer serializer = new AclRecordSerializer();
+
+        AclRecordSerializer() {
 
-        public Serializable getId() {
-            return id;
         }
 
-        public void setId(String id) {
-            this.id = id;
+        public static AclRecordSerializer getInstance() {
+            return serializer;
         }
 
-        public String getType() {
-            return type;
+        @Override
+        public void serialize(AclRecord obj, DataOutputStream out) throws IOException {
+            String jsonStr = JsonUtil.writeValueAsString(obj);
+            out.writeUTF(jsonStr);
         }
 
-        public void setType(String type) {
-            this.type = type;
+        @Override
+        public AclRecord deserialize(DataInputStream in) throws IOException {
+            String jsonStr = in.readUTF();
+            return JsonUtil.readValue(jsonStr, AclRecord.class);
         }
     }
 
-    protected static class SidInfo {
-        private String sid;
-        private boolean isPrincipal;
 
-        public SidInfo() {
-        }
+}
 
-        public SidInfo(Sid sid) {
-            if (sid instanceof PrincipalSid) {
-                this.sid = ((PrincipalSid) sid).getPrincipal();
-                this.isPrincipal = true;
-            } else if (sid instanceof GrantedAuthoritySid) {
-                this.sid = ((GrantedAuthoritySid) sid).getGrantedAuthority();
-                this.isPrincipal = false;
-            }
-        }
+class AclRecord extends RootPersistentEntity {
 
-        public String getSid() {
-            return sid;
-        }
+    @JsonProperty()
+    private DomainObjectInfo domainObjectInfo;
 
-        public void setSid(String sid) {
-            this.sid = sid;
-        }
+    @JsonProperty()
+    private DomainObjectInfo parentDomainObjectInfo;
 
-        public boolean isPrincipal() {
-            return isPrincipal;
-        }
+    @JsonProperty()
+    private SidInfo ownerInfo;
 
-        public void setPrincipal(boolean isPrincipal) {
-            this.isPrincipal = isPrincipal;
-        }
+    @JsonProperty()
+    private boolean entriesInheriting;
+
+    @JsonProperty()
+    private Map<String, AceInfo> allAceInfo;
+
+    public AclRecord() {
     }
 
-    protected static class AceInfo {
-        private SidInfo sidInfo;
-        private int permissionMask;
+    public AclRecord(DomainObjectInfo domainObjectInfo, DomainObjectInfo parentDomainObjectInfo, SidInfo ownerInfo, boolean entriesInheriting, Map<String, AceInfo> allAceInfo) {
+        this.domainObjectInfo = domainObjectInfo;
+        this.parentDomainObjectInfo = parentDomainObjectInfo;
+        this.ownerInfo = ownerInfo;
+        this.entriesInheriting = entriesInheriting;
+        this.allAceInfo = allAceInfo;
+    }
 
-        public AceInfo() {
-        }
+    public SidInfo getOwnerInfo() {
+        return ownerInfo;
+    }
 
-        public AceInfo(AccessControlEntry ace) {
-            super();
-            this.sidInfo = new SidInfo(ace.getSid());
-            this.permissionMask = ace.getPermission().getMask();
-        }
+    public void setOwnerInfo(SidInfo ownerInfo) {
+        this.ownerInfo = ownerInfo;
+    }
 
-        public SidInfo getSidInfo() {
-            return sidInfo;
-        }
+    public boolean isEntriesInheriting() {
+        return entriesInheriting;
+    }
 
-        public void setSidInfo(SidInfo sidInfo) {
-            this.sidInfo = sidInfo;
-        }
+    public void setEntriesInheriting(boolean entriesInheriting) {
+        this.entriesInheriting = entriesInheriting;
+    }
 
-        public int getPermissionMask() {
-            return permissionMask;
-        }
+    public DomainObjectInfo getDomainObjectInfo() {
+        return domainObjectInfo;
+    }
+
+    public void setDomainObjectInfo(DomainObjectInfo domainObjectInfo) {
+        this.domainObjectInfo = domainObjectInfo;
+    }
+
+    public DomainObjectInfo getParentDomainObjectInfo() {
+        return parentDomainObjectInfo;
+    }
+
+    public void setParentDomainObjectInfo(DomainObjectInfo parentDomainObjectInfo) {
+        this.parentDomainObjectInfo = parentDomainObjectInfo;
+    }
+
+    public Map<String, AceInfo> getAllAceInfo() {
+        return allAceInfo;
+    }
+
+    public void setAllAceInfo(Map<String, AceInfo> allAceInfo) {
+        this.allAceInfo = allAceInfo;
+    }
+
+}
+
+class DomainObjectInfo {
+    private String id;
+    private String type;
+
+    public DomainObjectInfo() {
+    }
+
+    public DomainObjectInfo(ObjectIdentity oid) {
+        super();
+        this.id = (String) oid.getIdentifier();
+        this.type = oid.getType();
+    }
 
-        public void setPermissionMask(int permissionMask) {
-            this.permissionMask = permissionMask;
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+}
+
+
+class SidInfo {
+    private String sid;
+    private boolean isPrincipal;
+
+    public SidInfo() {
+    }
+
+    public SidInfo(Sid sid) {
+        if (sid instanceof PrincipalSid) {
+            this.sid = ((PrincipalSid) sid).getPrincipal();
+            this.isPrincipal = true;
+        } else if (sid instanceof GrantedAuthoritySid) {
+            this.sid = ((GrantedAuthoritySid) sid).getGrantedAuthority();
+            this.isPrincipal = false;
         }
     }
 
+<<<<<<< HEAD
+=======
+    public String getSid() {
+        return sid;
+    }
+
+    public void setSid(String sid) {
+        this.sid = sid;
+    }
+
+    public boolean isPrincipal() {
+        return isPrincipal;
+    }
+
+    public void setPrincipal(boolean isPrincipal) {
+        this.isPrincipal = isPrincipal;
+    }
+>>>>>>> 62cdc0f... KYLIN-2535 AclService and UserService store records via ResourceStore interface
 
 }
+
+class AceInfo {
+    private SidInfo sidInfo;
+    private int permissionMask;
+
+    public AceInfo() {
+    }
+
+    public AceInfo(AccessControlEntry ace) {
+        super();
+        this.sidInfo = new SidInfo(ace.getSid());
+        this.permissionMask = ace.getPermission().getMask();
+    }
+
+    public SidInfo getSidInfo() {
+        return sidInfo;
+    }
+
+    public void setSidInfo(SidInfo sidInfo) {
+        this.sidInfo = sidInfo;
+    }
+
+    public int getPermissionMask() {
+        return permissionMask;
+    }
+
+    public void setPermissionMask(int permissionMask) {
+        this.permissionMask = permissionMask;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/95c6d4fa/server-base/src/main/java/org/apache/kylin/rest/service/AclServiceOld.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/AclServiceOld.java b/server-base/src/main/java/org/apache/kylin/rest/service/AclServiceOld.java
new file mode 100644
index 0000000..8109562
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/AclServiceOld.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.service;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.rest.security.AclHBaseStorage;
+import org.apache.kylin.rest.util.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.security.acls.domain.AccessControlEntryImpl;
+import org.springframework.security.acls.domain.AclAuthorizationStrategy;
+import org.springframework.security.acls.domain.AclImpl;
+import org.springframework.security.acls.domain.AuditLogger;
+import org.springframework.security.acls.domain.GrantedAuthoritySid;
+import org.springframework.security.acls.domain.ObjectIdentityImpl;
+import org.springframework.security.acls.domain.PermissionFactory;
+import org.springframework.security.acls.domain.PrincipalSid;
+import org.springframework.security.acls.model.AccessControlEntry;
+import org.springframework.security.acls.model.Acl;
+import org.springframework.security.acls.model.AlreadyExistsException;
+import org.springframework.security.acls.model.ChildrenExistException;
+import org.springframework.security.acls.model.MutableAcl;
+import org.springframework.security.acls.model.MutableAclService;
+import org.springframework.security.acls.model.NotFoundException;
+import org.springframework.security.acls.model.ObjectIdentity;
+import org.springframework.security.acls.model.PermissionGrantingStrategy;
+import org.springframework.security.acls.model.Sid;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.context.SecurityContextHolder;
+import org.springframework.security.util.FieldUtils;
+import org.springframework.util.Assert;
+
+import javax.annotation.PostConstruct;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+
+/**
+ * @author xduo
+ */
+//@Component("aclService")
+@Deprecated
+public class AclServiceOld implements MutableAclService {
+
+    private static final Logger logger = LoggerFactory.getLogger(AclServiceOld.class);
+
+    private static String ACL_INFO_FAMILY_TYPE_COLUMN = "t";
+    private static String ACL_INFO_FAMILY_OWNER_COLUMN = "o";
+    private static String ACL_INFO_FAMILY_PARENT_COLUMN = "p";
+    private static String ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN = "i";
+
+    private Serializer<SidInfo> sidSerializer = new Serializer<SidInfo>(SidInfo.class);
+    private Serializer<DomainObjectInfo> domainObjSerializer = new Serializer<DomainObjectInfo>(DomainObjectInfo.class);
+    private Serializer<AceInfo> aceSerializer = new Serializer<AceInfo>(AceInfo.class);
+
+    private String aclTableName = null;
+
+    private final Field fieldAces = FieldUtils.getField(AclImpl.class, "aces");
+
+    private final Field fieldAcl = FieldUtils.getField(AccessControlEntryImpl.class, "acl");
+
+    @Autowired
+    protected PermissionGrantingStrategy permissionGrantingStrategy;
+
+    @Autowired
+    protected PermissionFactory aclPermissionFactory;
+
+    @Autowired
+    protected AclAuthorizationStrategy aclAuthorizationStrategy;
+
+    @Autowired
+    protected AuditLogger auditLogger;
+
+    @Autowired
+    protected AclHBaseStorage aclHBaseStorage;
+
+    public AclServiceOld() throws IOException {
+        fieldAces.setAccessible(true);
+        fieldAcl.setAccessible(true);
+    }
+
+    @PostConstruct
+    public void init() throws IOException {
+        aclTableName = aclHBaseStorage.prepareHBaseTable(AclServiceOld.class);
+    }
+
+    @Override
+    public List<ObjectIdentity> findChildren(ObjectIdentity parentIdentity) {
+        List<ObjectIdentity> oids = new ArrayList<ObjectIdentity>();
+        Table htable = null;
+        try {
+            htable = aclHBaseStorage.getTable(aclTableName);
+
+            Scan scan = new Scan();
+            SingleColumnValueFilter parentFilter = new SingleColumnValueFilter(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), CompareOp.EQUAL, domainObjSerializer.serialize(new DomainObjectInfo(parentIdentity)));
+            parentFilter.setFilterIfMissing(true);
+            scan.setFilter(parentFilter);
+
+            ResultScanner scanner = htable.getScanner(scan);
+            for (Result result = scanner.next(); result != null; result = scanner.next()) {
+                String id = Bytes.toString(result.getRow());
+                String type = Bytes.toString(result.getValue(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN)));
+
+                oids.add(new ObjectIdentityImpl(type, id));
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        } finally {
+            IOUtils.closeQuietly(htable);
+        }
+
+        return oids;
+    }
+
+    @Override
+    public Acl readAclById(ObjectIdentity object) throws NotFoundException {
+        Map<ObjectIdentity, Acl> aclsMap = readAclsById(Arrays.asList(object), null);
+        //        Assert.isTrue(aclsMap.containsKey(object), "There should have been an Acl entry for ObjectIdentity " + object);
+
+        return aclsMap.get(object);
+    }
+
+    @Override
+    public Acl readAclById(ObjectIdentity object, List<Sid> sids) throws NotFoundException {
+        Map<ObjectIdentity, Acl> aclsMap = readAclsById(Arrays.asList(object), sids);
+        Assert.isTrue(aclsMap.containsKey(object), "There should have been an Acl entry for ObjectIdentity " + object);
+
+        return aclsMap.get(object);
+    }
+
+    @Override
+    public Map<ObjectIdentity, Acl> readAclsById(List<ObjectIdentity> objects) throws NotFoundException {
+        return readAclsById(objects, null);
+    }
+
+    @Override
+    public Map<ObjectIdentity, Acl> readAclsById(List<ObjectIdentity> oids, List<Sid> sids) throws NotFoundException {
+        Map<ObjectIdentity, Acl> aclMaps = new HashMap<ObjectIdentity, Acl>();
+        Table htable = null;
+        Result result = null;
+        try {
+            htable = aclHBaseStorage.getTable(aclTableName);
+
+            for (ObjectIdentity oid : oids) {
+                result = htable.get(new Get(Bytes.toBytes(String.valueOf(oid.getIdentifier()))));
+
+                if (null != result && !result.isEmpty()) {
+                    SidInfo owner = sidSerializer.deserialize(result.getValue(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN)));
+                    Sid ownerSid = (null == owner) ? null : (owner.isPrincipal() ? new PrincipalSid(owner.getSid()) : new GrantedAuthoritySid(owner.getSid()));
+                    boolean entriesInheriting = Bytes.toBoolean(result.getValue(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN)));
+
+                    Acl parentAcl = null;
+                    DomainObjectInfo parentInfo = domainObjSerializer.deserialize(result.getValue(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN)));
+                    if (null != parentInfo) {
+                        ObjectIdentity parentObj = new ObjectIdentityImpl(parentInfo.getType(), parentInfo.getId());
+                        parentAcl = readAclById(parentObj, null);
+                    }
+
+                    AclImpl acl = new AclImpl(oid, oid.getIdentifier(), aclAuthorizationStrategy, permissionGrantingStrategy, parentAcl, null, entriesInheriting, ownerSid);
+                    genAces(sids, result, acl);
+
+                    aclMaps.put(oid, acl);
+                } else {
+                    throw new NotFoundException("Unable to find ACL information for object identity '" + oid + "'");
+                }
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        } finally {
+            IOUtils.closeQuietly(htable);
+        }
+
+        return aclMaps;
+    }
+
+    @Override
+    public MutableAcl createAcl(ObjectIdentity objectIdentity) throws AlreadyExistsException {
+        Acl acl = null;
+
+        try {
+            acl = readAclById(objectIdentity);
+        } catch (NotFoundException e) {
+            //do nothing?
+        }
+        if (null != acl) {
+            throw new AlreadyExistsException("ACL of " + objectIdentity + " exists!");
+        }
+
+        Authentication auth = SecurityContextHolder.getContext().getAuthentication();
+        PrincipalSid sid = new PrincipalSid(auth);
+
+        Table htable = null;
+        try {
+            htable = aclHBaseStorage.getTable(aclTableName);
+
+            Put put = new Put(Bytes.toBytes(String.valueOf(objectIdentity.getIdentifier())));
+            put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN), Bytes.toBytes(objectIdentity.getType()));
+            put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN), sidSerializer.serialize(new SidInfo(sid)));
+            put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN), Bytes.toBytes(true));
+
+            htable.put(put);
+
+            logger.debug("ACL of " + objectIdentity + " created successfully.");
+        } catch (IOException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        } finally {
+            IOUtils.closeQuietly(htable);
+        }
+
+        return (MutableAcl) readAclById(objectIdentity);
+    }
+
+    @Override
+    public void deleteAcl(ObjectIdentity objectIdentity, boolean deleteChildren) throws ChildrenExistException {
+        Table htable = null;
+        try {
+            htable = aclHBaseStorage.getTable(aclTableName);
+
+            Delete delete = new Delete(Bytes.toBytes(String.valueOf(objectIdentity.getIdentifier())));
+
+            List<ObjectIdentity> children = findChildren(objectIdentity);
+            if (!deleteChildren && children.size() > 0) {
+                throw new ChildrenExistException("Children exists for " + objectIdentity);
+            }
+
+            for (ObjectIdentity oid : children) {
+                deleteAcl(oid, deleteChildren);
+            }
+
+            htable.delete(delete);
+
+            logger.debug("ACL of " + objectIdentity + " deleted successfully.");
+        } catch (IOException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        } finally {
+            IOUtils.closeQuietly(htable);
+        }
+    }
+
+    @Override
+    public MutableAcl updateAcl(MutableAcl acl) throws NotFoundException {
+        try {
+            readAclById(acl.getObjectIdentity());
+        } catch (NotFoundException e) {
+            throw e;
+        }
+
+        Table htable = null;
+        try {
+            htable = aclHBaseStorage.getTable(aclTableName);
+
+            Delete delete = new Delete(Bytes.toBytes(String.valueOf(acl.getObjectIdentity().getIdentifier())));
+            delete.deleteFamily(Bytes.toBytes(AclHBaseStorage.ACL_ACES_FAMILY));
+            htable.delete(delete);
+
+            Put put = new Put(Bytes.toBytes(String.valueOf(acl.getObjectIdentity().getIdentifier())));
+
+            if (null != acl.getParentAcl()) {
+                put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), domainObjSerializer.serialize(new DomainObjectInfo(acl.getParentAcl().getObjectIdentity())));
+            }
+
+            for (AccessControlEntry ace : acl.getEntries()) {
+                AceInfo aceInfo = new AceInfo(ace);
+                put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_ACES_FAMILY), Bytes.toBytes(aceInfo.getSidInfo().getSid()), aceSerializer.serialize(aceInfo));
+            }
+
+            if (!put.isEmpty()) {
+                htable.put(put);
+
+                logger.debug("ACL of " + acl.getObjectIdentity() + " updated successfully.");
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        } finally {
+            IOUtils.closeQuietly(htable);
+        }
+
+        return (MutableAcl) readAclById(acl.getObjectIdentity());
+    }
+
+    private void genAces(List<Sid> sids, Result result, AclImpl acl) throws JsonParseException, JsonMappingException, IOException {
+        List<AceInfo> aceInfos = new ArrayList<AceInfo>();
+        if (null != sids) {
+            // Just return aces in sids
+            for (Sid sid : sids) {
+                String sidName = null;
+                if (sid instanceof PrincipalSid) {
+                    sidName = ((PrincipalSid) sid).getPrincipal();
+                } else if (sid instanceof GrantedAuthoritySid) {
+                    sidName = ((GrantedAuthoritySid) sid).getGrantedAuthority();
+                }
+
+                AceInfo aceInfo = aceSerializer.deserialize(result.getValue(Bytes.toBytes(AclHBaseStorage.ACL_ACES_FAMILY), Bytes.toBytes(sidName)));
+                if (null != aceInfo) {
+                    aceInfos.add(aceInfo);
+                }
+            }
+        } else {
+            NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(Bytes.toBytes(AclHBaseStorage.ACL_ACES_FAMILY));
+            for (byte[] qualifier : familyMap.keySet()) {
+                AceInfo aceInfo = aceSerializer.deserialize(familyMap.get(qualifier));
+
+                if (null != aceInfo) {
+                    aceInfos.add(aceInfo);
+                }
+            }
+        }
+
+        List<AccessControlEntry> newAces = new ArrayList<AccessControlEntry>();
+        for (int i = 0; i < aceInfos.size(); i++) {
+            AceInfo aceInfo = aceInfos.get(i);
+
+            if (null != aceInfo) {
+                Sid sid = aceInfo.getSidInfo().isPrincipal() ? new PrincipalSid(aceInfo.getSidInfo().getSid()) : new GrantedAuthoritySid(aceInfo.getSidInfo().getSid());
+                AccessControlEntry ace = new AccessControlEntryImpl(Long.valueOf(i), acl, sid, aclPermissionFactory.buildFromMask(aceInfo.getPermissionMask()), true, false, false);
+                newAces.add(ace);
+            }
+        }
+
+        this.setAces(acl, newAces);
+    }
+
+    private void setAces(AclImpl acl, List<AccessControlEntry> aces) {
+        try {
+            fieldAces.set(acl, aces);
+        } catch (IllegalAccessException e) {
+            throw new IllegalStateException("Could not set AclImpl entries", e);
+        }
+    }
+
+    protected static class DomainObjectInfo {
+        private String id;
+        private String type;
+
+        public DomainObjectInfo() {
+        }
+
+        public DomainObjectInfo(ObjectIdentity oid) {
+            super();
+            this.id = (String) oid.getIdentifier();
+            this.type = oid.getType();
+        }
+
+        public Serializable getId() {
+            return id;
+        }
+
+        public void setId(String id) {
+            this.id = id;
+        }
+
+        public String getType() {
+            return type;
+        }
+
+        public void setType(String type) {
+            this.type = type;
+        }
+    }
+
+    protected static class SidInfo {
+        private String sid;
+        private boolean isPrincipal;
+
+        public SidInfo() {
+        }
+
+        public SidInfo(Sid sid) {
+            if (sid instanceof PrincipalSid) {
+                this.sid = ((PrincipalSid) sid).getPrincipal();
+                this.isPrincipal = true;
+            } else if (sid instanceof GrantedAuthoritySid) {
+                this.sid = ((GrantedAuthoritySid) sid).getGrantedAuthority();
+                this.isPrincipal = false;
+            }
+        }
+
+        public String getSid() {
+            return sid;
+        }
+
+        public void setSid(String sid) {
+            this.sid = sid;
+        }
+
+        public boolean isPrincipal() {
+            return isPrincipal;
+        }
+
+        public void setPrincipal(boolean isPrincipal) {
+            this.isPrincipal = isPrincipal;
+        }
+    }
+
+    protected static class AceInfo {
+        private SidInfo sidInfo;
+        private int permissionMask;
+
+        public AceInfo() {
+        }
+
+        public AceInfo(AccessControlEntry ace) {
+            super();
+            this.sidInfo = new SidInfo(ace.getSid());
+            this.permissionMask = ace.getPermission().getMask();
+        }
+
+        public SidInfo getSidInfo() {
+            return sidInfo;
+        }
+
+        public void setSidInfo(SidInfo sidInfo) {
+            this.sidInfo = sidInfo;
+        }
+
+        public int getPermissionMask() {
+            return permissionMask;
+        }
+
+        public void setPermissionMask(int permissionMask) {
+            this.permissionMask = permissionMask;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/95c6d4fa/server-base/src/main/java/org/apache/kylin/rest/service/AclTableMigrationTool.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/AclTableMigrationTool.java b/server-base/src/main/java/org/apache/kylin/rest/service/AclTableMigrationTool.java
new file mode 100644
index 0000000..f418e30
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/AclTableMigrationTool.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.service;
+
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.nio.ch.IOUtil;
+
+import java.io.IOException;
+
+public class AclTableMigrationTool {
+
+    public static final String ACL_TABLE_NAME = "_acl";
+
+    public static final String USER_TABLE_NAME = "_user";
+
+    private static final Logger logger = LoggerFactory.getLogger(AclTableMigrationTool.class);
+
+    public static void migrate(ResourceStore store) throws IOException {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        String aclTableName = kylinConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME;
+        String userTableName = kylinConfig.getMetadataUrl() + USER_TABLE_NAME;
+
+
+    }
+
+    public static void convertToResourceStore(KylinConfig kylinConfig, String tableName, ResourceStore store, ResultConverter converter) throws IOException {
+        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
+        Admin hbaseAdmin = new HBaseAdmin(conf);
+        if (hbaseAdmin.tableExists(TableName.valueOf(tableName))) {
+            Table table = null;
+            ResultScanner rs = null;
+            Scan scan = new Scan();
+            try {
+                table = HBaseConnection.get(kylinConfig.getStorageUrl()).getTable(TableName.valueOf(tableName));
+                rs = table.getScanner(scan);
+                converter.converter(rs, store);
+            } finally {
+                rs.close();
+                IOUtils.closeQuietly(table);
+            }
+        } else {
+
+        }
+    }
+
+    interface ResultConverter {
+        void converter(ResultScanner rs, ResourceStore store);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/95c6d4fa/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java b/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
index 9d94de1..299a512 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.rest.service;
 
+<<<<<<< HEAD
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -42,6 +43,16 @@ import org.apache.kylin.rest.security.AclHBaseStorage;
 import org.apache.kylin.rest.util.Serializer;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.security.access.prepost.PreAuthorize;
+=======
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.util.JsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+>>>>>>> 62cdc0f... KYLIN-2535 AclService and UserService store records via ResourceStore interface
 import org.springframework.security.core.GrantedAuthority;
 import org.springframework.security.core.userdetails.User;
 import org.springframework.security.core.userdetails.UserDetails;
@@ -49,95 +60,32 @@ import org.springframework.security.core.userdetails.UsernameNotFoundException;
 import org.springframework.security.provisioning.UserDetailsManager;
 import org.springframework.stereotype.Component;
 
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonMappingException;
+import javax.annotation.PostConstruct;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  */
 @Component("userService")
 public class UserService implements UserDetailsManager {
 
-    private static final String PWD_PREFIX = "PWD:";
+    private Logger logger = LoggerFactory.getLogger(UserService.class);
 
-    private Serializer<UserGrantedAuthority[]> ugaSerializer = new Serializer<UserGrantedAuthority[]>(UserGrantedAuthority[].class);
+    private static final String DIR_PREFIX = "/user/";
 
-    private String userTableName = null;
+    private static final String PWD_PREFIX = "PWD:";
 
-    @Autowired
-    protected AclHBaseStorage aclHBaseStorage;
+    protected ResourceStore aclStore;
 
     @PostConstruct
     public void init() throws IOException {
-        userTableName = aclHBaseStorage.prepareHBaseTable(UserService.class);
-    }
-
-    @Override
-    public UserDetails loadUserByUsername(String username) throws UsernameNotFoundException {
-        Table htable = null;
-        try {
-            htable = aclHBaseStorage.getTable(userTableName);
-
-            Get get = new Get(Bytes.toBytes(username));
-            get.addFamily(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY));
-            Result result = htable.get(get);
-
-            User user = hbaseRowToUser(result);
-            if (user == null)
-                throw new UsernameNotFoundException("User " + username + " not found.");
-
-            return user;
-        } catch (IOException e) {
-            throw new RuntimeException(e.getMessage(), e);
-        } finally {
-            IOUtils.closeQuietly(htable);
-        }
-    }
-
-    private User hbaseRowToUser(Result result) throws JsonParseException, JsonMappingException, IOException {
-        if (null == result || result.isEmpty())
-            return null;
-
-        String username = Bytes.toString(result.getRow());
-
-        byte[] valueBytes = result.getValue(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN));
-        UserGrantedAuthority[] deserialized = ugaSerializer.deserialize(valueBytes);
-
-        String password = "";
-        List<UserGrantedAuthority> authorities = Collections.emptyList();
-
-        // password is stored at [0] of authorities for backward compatibility
-        if (deserialized != null) {
-            if (deserialized.length > 0 && deserialized[0].getAuthority().startsWith(PWD_PREFIX)) {
-                password = deserialized[0].getAuthority().substring(PWD_PREFIX.length());
-                authorities = Arrays.asList(deserialized).subList(1, deserialized.length);
-            } else {
-                authorities = Arrays.asList(deserialized);
-            }
-        }
-
-        return new User(username, password, authorities);
+        aclStore = ResourceStore.getStore(KylinConfig.getInstanceFromEnv());
+        logger.debug("UserService init");
     }
 
-    private Pair<byte[], byte[]> userToHBaseRow(UserDetails user) throws JsonProcessingException {
-        byte[] key = Bytes.toBytes(user.getUsername());
-
-        Collection<? extends GrantedAuthority> authorities = user.getAuthorities();
-        if (authorities == null)
-            authorities = Collections.emptyList();
-
-        UserGrantedAuthority[] serializing = new UserGrantedAuthority[authorities.size() + 1];
-
-        // password is stored as the [0] authority
-        serializing[0] = new UserGrantedAuthority(PWD_PREFIX + user.getPassword());
-        int i = 1;
-        for (GrantedAuthority a : authorities) {
-            serializing[i++] = new UserGrantedAuthority(a.getAuthority());
-        }
-
-        byte[] value = ugaSerializer.serialize(serializing);
-        return Pair.newPair(key, value);
-    }
 
     @Override
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
@@ -148,58 +96,56 @@ public class UserService implements UserDetailsManager {
     @Override
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
     public void updateUser(UserDetails user) {
-        Table htable = null;
         try {
-            htable = aclHBaseStorage.getTable(userTableName);
-
-            Pair<byte[], byte[]> pair = userToHBaseRow(user);
-            Put put = new Put(pair.getKey());
-
-            put.addColumn(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN), pair.getSecond());
-
-            htable.put(put);
+            deleteUser(user.getUsername());
+            String id = getId(user.getUsername());
+            aclStore.putResource(id, new UserInfo(user), 0, UserInfoSerializer.getInstance());
+            logger.debug("update user : {}", user.getUsername());
         } catch (IOException e) {
-            throw new RuntimeException(e.getMessage(), e);
-        } finally {
-            IOUtils.closeQuietly(htable);
+            throw new RuntimeException(e);
         }
     }
 
     @Override
+<<<<<<< HEAD
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
     public void deleteUser(String username) {
         Table htable = null;
+=======
+    public void deleteUser(String userName) {
+>>>>>>> 62cdc0f... KYLIN-2535 AclService and UserService store records via ResourceStore interface
         try {
-            htable = aclHBaseStorage.getTable(userTableName);
-
-            Delete delete = new Delete(Bytes.toBytes(username));
-
-            htable.delete(delete);
+            String id = getId(userName);
+            aclStore.deleteResource(id);
+            logger.debug("delete user : {}", userName);
         } catch (IOException e) {
-            throw new RuntimeException(e.getMessage(), e);
-        } finally {
-            IOUtils.closeQuietly(htable);
+            throw new RuntimeException(e);
         }
     }
 
     @Override
-    public void changePassword(String oldPassword, String newPassword) {
+    public void changePassword(String oldPassword, String newPasswor) {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public boolean userExists(String username) {
-        Table htable = null;
+    public boolean userExists(String userName) {
         try {
-            htable = aclHBaseStorage.getTable(userTableName);
-
-            Result result = htable.get(new Get(Bytes.toBytes(username)));
+            logger.debug("judge user exist: {}", userName);
+            return aclStore.exists(getId(userName));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
 
-            return null != result && !result.isEmpty();
+    @Override
+    public UserDetails loadUserByUsername(String userName) throws UsernameNotFoundException {
+        try {
+            UserInfo userInfo = aclStore.getResource(getId(userName), UserInfo.class, UserInfoSerializer.getInstance());
+            logger.debug("load user : {}", userName);
+            return wrap(userInfo);
         } catch (IOException e) {
-            throw new RuntimeException(e.getMessage(), e);
-        } finally {
-            IOUtils.closeQuietly(htable);
+            throw new RuntimeException(e);
         }
     }
 
@@ -216,78 +162,159 @@ public class UserService implements UserDetailsManager {
     }
 
     public List<UserDetails> listUsers() {
-        Scan s = new Scan();
-        s.addColumn(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN));
-
         List<UserDetails> all = new ArrayList<UserDetails>();
-        Table htable = null;
-        ResultScanner scanner = null;
         try {
-            htable = aclHBaseStorage.getTable(userTableName);
-            scanner = htable.getScanner(s);
-
-            for (Result result = scanner.next(); result != null; result = scanner.next()) {
-                User user = hbaseRowToUser(result);
-                all.add(user);
+            List<UserInfo> userInfos = aclStore.getAllResources(DIR_PREFIX, UserInfo.class, UserInfoSerializer.getInstance());
+            for (UserInfo info : userInfos) {
+                all.add(wrap(info));
             }
         } catch (IOException e) {
-            throw new RuntimeException("Failed to scan users", e);
-        } finally {
-            IOUtils.closeQuietly(scanner);
-            IOUtils.closeQuietly(htable);
+            throw new RuntimeException("Failed to list users", e);
         }
         return all;
     }
 
-    public static class UserGrantedAuthority implements GrantedAuthority {
-        private static final long serialVersionUID = -5128905636841891058L;
-        private String authority;
+    private String getId(String userName) {
+        return DIR_PREFIX + userName;
+    }
 
-        public UserGrantedAuthority() {
+    private User wrap(UserInfo userInfo) {
+        List<GrantedAuthority> authorities = new ArrayList<>();
+        List<String> auths = userInfo.getAuthorities();
+        for (String str : auths) {
+            authorities.add(new UserGrantedAuthority(str));
         }
+        return new User(userInfo.getUsername(), userInfo.getPassword(), authorities);
+    }
 
-        public UserGrantedAuthority(String authority) {
-            setAuthority(authority);
-        }
 
-        @Override
-        public String getAuthority() {
-            return authority;
-        }
+}
 
-        public void setAuthority(String authority) {
-            this.authority = authority;
-        }
+class UserInfo extends RootPersistentEntity {
+    @JsonProperty()
+    private String username;
+    @JsonProperty()
+    private String password;
+    @JsonProperty()
+    private List<String> authorities = new ArrayList<>();
+
+    public UserInfo(String username, String password, List<String> authorities) {
+        this.username = username;
+        this.password = password;
+        this.authorities = authorities;
+    }
 
-        @Override
-        public int hashCode() {
-            final int prime = 31;
-            int result = 1;
-            result = prime * result + ((authority == null) ? 0 : authority.hashCode());
-            return result;
+    public UserInfo(UserDetails user) {
+        this.username = user.getUsername();
+        this.password = user.getPassword();
+        for (GrantedAuthority a : user.getAuthorities()) {
+            this.authorities.add(a.getAuthority());
         }
+    }
 
-        @Override
-        public boolean equals(Object obj) {
-            if (this == obj)
-                return true;
-            if (obj == null)
-                return false;
-            if (getClass() != obj.getClass())
-                return false;
-            UserGrantedAuthority other = (UserGrantedAuthority) obj;
-            if (authority == null) {
-                if (other.authority != null)
-                    return false;
-            } else if (!authority.equals(other.authority))
-                return false;
-            return true;
-        }
+    public UserInfo() {
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public List<String> getAuthorities() {
+        return authorities;
+    }
+
+    public void setAuthorities(List<String> authorities) {
+        this.authorities = authorities;
+    }
+
+}
+
+class UserInfoSerializer implements Serializer<UserInfo> {
+
+    private static final UserInfoSerializer serializer = new UserInfoSerializer();
+
+    private UserInfoSerializer() {
 
-        @Override
-        public String toString() {
-            return authority;
-        }
     }
 
+    public static UserInfoSerializer getInstance() {
+        return serializer;
+    }
+
+
+    @Override
+    public void serialize(UserInfo userInfo, DataOutputStream out) throws IOException {
+        String json = JsonUtil.writeValueAsString(userInfo);
+        out.writeUTF(json);
+    }
+
+    @Override
+    public UserInfo deserialize(DataInputStream in) throws IOException {
+        String json = in.readUTF();
+        return JsonUtil.readValue(json, UserInfo.class);
+    }
+}
+
+class UserGrantedAuthority implements GrantedAuthority {
+    private static final long serialVersionUID = -5128905636841891058L;
+
+    private String authority;
+
+    public UserGrantedAuthority() {
+    }
+
+    public UserGrantedAuthority(String authority) {
+        setAuthority(authority);
+    }
+
+    @Override
+    public String getAuthority() {
+        return authority;
+    }
+
+    public void setAuthority(String authority) {
+        this.authority = authority;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((authority == null) ? 0 : authority.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        UserGrantedAuthority other = (UserGrantedAuthority) obj;
+        if (authority == null) {
+            if (other.authority != null)
+                return false;
+        } else if (!authority.equals(other.authority))
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return authority;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/95c6d4fa/server-base/src/main/java/org/apache/kylin/rest/service/UserServiceOld.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/UserServiceOld.java b/server-base/src/main/java/org/apache/kylin/rest/service/UserServiceOld.java
new file mode 100644
index 0000000..79138b2
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/UserServiceOld.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.service;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.rest.security.AclHBaseStorage;
+import org.apache.kylin.rest.util.Serializer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.security.core.GrantedAuthority;
+import org.springframework.security.core.userdetails.User;
+import org.springframework.security.core.userdetails.UserDetails;
+import org.springframework.security.core.userdetails.UsernameNotFoundException;
+import org.springframework.security.provisioning.UserDetailsManager;
+
+import javax.annotation.PostConstruct;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ */
+//@Component("userService")
+@Deprecated
+public class UserServiceOld implements UserDetailsManager {
+
+    private static final String PWD_PREFIX = "PWD:";
+
+    private Serializer<UserGrantedAuthority[]> ugaSerializer = new Serializer<UserGrantedAuthority[]>(UserGrantedAuthority[].class);
+
+    private String userTableName = null;
+
+    @Autowired
+    protected AclHBaseStorage aclHBaseStorage;
+
+    @PostConstruct
+    public void init() throws IOException {
+        userTableName = aclHBaseStorage.prepareHBaseTable(UserServiceOld.class);
+    }
+
+    @Override
+    public UserDetails loadUserByUsername(String username) throws UsernameNotFoundException {
+        Table htable = null;
+        try {
+            htable = aclHBaseStorage.getTable(userTableName);
+
+            Get get = new Get(Bytes.toBytes(username));
+            get.addFamily(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY));
+            Result result = htable.get(get);
+
+            User user = hbaseRowToUser(result);
+            if (user == null)
+                throw new UsernameNotFoundException("User " + username + " not found.");
+
+            return user;
+        } catch (IOException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        } finally {
+            IOUtils.closeQuietly(htable);
+        }
+    }
+
+    private User hbaseRowToUser(Result result) throws JsonParseException, JsonMappingException, IOException {
+        if (null == result || result.isEmpty())
+            return null;
+
+        String username = Bytes.toString(result.getRow());
+
+        byte[] valueBytes = result.getValue(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN));
+        UserGrantedAuthority[] deserialized = ugaSerializer.deserialize(valueBytes);
+
+        String password = "";
+        List<UserGrantedAuthority> authorities = Collections.emptyList();
+
+        // password is stored at [0] of authorities for backward compatibility
+        if (deserialized != null) {
+            if (deserialized.length > 0 && deserialized[0].getAuthority().startsWith(PWD_PREFIX)) {
+                password = deserialized[0].getAuthority().substring(PWD_PREFIX.length());
+                authorities = Arrays.asList(deserialized).subList(1, deserialized.length);
+            } else {
+                authorities = Arrays.asList(deserialized);
+            }
+        }
+
+        return new User(username, password, authorities);
+    }
+
+    private Pair<byte[], byte[]> userToHBaseRow(UserDetails user) throws JsonProcessingException {
+        byte[] key = Bytes.toBytes(user.getUsername());
+
+        Collection<? extends GrantedAuthority> authorities = user.getAuthorities();
+        if (authorities == null)
+            authorities = Collections.emptyList();
+
+        UserGrantedAuthority[] serializing = new UserGrantedAuthority[authorities.size() + 1];
+
+        // password is stored as the [0] authority
+        serializing[0] = new UserGrantedAuthority(PWD_PREFIX + user.getPassword());
+        int i = 1;
+        for (GrantedAuthority a : authorities) {
+            serializing[i++] = new UserGrantedAuthority(a.getAuthority());
+        }
+
+        byte[] value = ugaSerializer.serialize(serializing);
+        return Pair.newPair(key, value);
+    }
+
+    @Override
+    public void createUser(UserDetails user) {
+        updateUser(user);
+    }
+
+    @Override
+    public void updateUser(UserDetails user) {
+        Table htable = null;
+        try {
+            htable = aclHBaseStorage.getTable(userTableName);
+
+            Pair<byte[], byte[]> pair = userToHBaseRow(user);
+            Put put = new Put(pair.getKey());
+
+            put.addColumn(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN), pair.getSecond());
+
+            htable.put(put);
+        } catch (IOException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        } finally {
+            IOUtils.closeQuietly(htable);
+        }
+    }
+
+    @Override
+    public void deleteUser(String username) {
+        Table htable = null;
+        try {
+            htable = aclHBaseStorage.getTable(userTableName);
+
+            Delete delete = new Delete(Bytes.toBytes(username));
+
+            htable.delete(delete);
+        } catch (IOException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        } finally {
+            IOUtils.closeQuietly(htable);
+        }
+    }
+
+    @Override
+    public void changePassword(String oldPassword, String newPassword) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean userExists(String username) {
+        Table htable = null;
+        try {
+            htable = aclHBaseStorage.getTable(userTableName);
+
+            Result result = htable.get(new Get(Bytes.toBytes(username)));
+
+            return null != result && !result.isEmpty();
+        } catch (IOException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        } finally {
+            IOUtils.closeQuietly(htable);
+        }
+    }
+
+    public List<String> listUserAuthorities() {
+        List<String> all = new ArrayList<String>();
+        for (UserDetails user : listUsers()) {
+            for (GrantedAuthority auth : user.getAuthorities()) {
+                if (!all.contains(auth.getAuthority())) {
+                    all.add(auth.getAuthority());
+                }
+            }
+        }
+        return all;
+    }
+
+    public List<UserDetails> listUsers() {
+        Scan s = new Scan();
+        s.addColumn(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN));
+
+        List<UserDetails> all = new ArrayList<UserDetails>();
+        Table htable = null;
+        ResultScanner scanner = null;
+        try {
+            htable = aclHBaseStorage.getTable(userTableName);
+            scanner = htable.getScanner(s);
+
+            for (Result result = scanner.next(); result != null; result = scanner.next()) {
+                User user = hbaseRowToUser(result);
+                all.add(user);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to scan users", e);
+        } finally {
+            IOUtils.closeQuietly(scanner);
+            IOUtils.closeQuietly(htable);
+        }
+        return all;
+    }
+
+    public static class UserGrantedAuthority implements GrantedAuthority {
+        private static final long serialVersionUID = -5128905636841891058L;
+        private String authority;
+
+        public UserGrantedAuthority() {
+        }
+
+        public UserGrantedAuthority(String authority) {
+            setAuthority(authority);
+        }
+
+        @Override
+        public String getAuthority() {
+            return authority;
+        }
+
+        public void setAuthority(String authority) {
+            this.authority = authority;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + ((authority == null) ? 0 : authority.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            UserGrantedAuthority other = (UserGrantedAuthority) obj;
+            if (authority == null) {
+                if (other.authority != null)
+                    return false;
+            } else if (!authority.equals(other.authority))
+                return false;
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            return authority;
+        }
+    }
+
+}