You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/22 12:50:31 UTC

[6/6] incubator-kylin git commit: KYLIN-875 Refactor core-common, remove dependency on hadoop/hbase

KYLIN-875 Refactor core-common, remove dependency on hadoop/hbase


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d2456215
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d2456215
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d2456215

Branch: refs/heads/0.8
Commit: d2456215cc881358025561d0375e787fb960daf1
Parents: dd5f2b2
Author: Li, Yang <ya...@ebay.com>
Authored: Wed Jul 22 18:49:06 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Wed Jul 22 18:49:46 2015 +0800

----------------------------------------------------------------------
 core-common/pom.xml                             |  64 +-
 .../org/apache/kylin/common/lock/JobLock.java   |   9 -
 .../apache/kylin/common/lock/MockJobLock.java   |  15 -
 .../kylin/common/lock/ZookeeperJobLock.java     |  83 ---
 .../org/apache/kylin/common/mr/KylinMapper.java |  33 --
 .../apache/kylin/common/mr/KylinReducer.java    |  31 -
 .../common/persistence/HBaseConnection.java     | 167 ------
 .../common/persistence/HBaseResourceStore.java  | 323 ----------
 .../kylin/common/persistence/ResourceStore.java |  13 +-
 .../kylin/common/restclient/RestClient.java     |  12 +-
 .../org/apache/kylin/common/util/Bytes.java     | 557 ++++-------------
 .../apache/kylin/common/util/BytesSplitter.java |   8 +-
 .../org/apache/kylin/common/util/BytesUtil.java |  26 +-
 .../common/util/HBaseRegionSizeCalculator.java  | 128 ----
 .../apache/kylin/common/util/HadoopUtil.java    | 162 -----
 .../apache/kylin/common/util/HiveClient.java    | 162 -----
 .../org/apache/kylin/common/util/JsonUtil.java  |  18 +-
 .../org/apache/kylin/common/util/TarGZUtil.java |  69 ---
 .../persistence/ITHBaseResourceStoreTest.java   | 208 -------
 .../common/util/HBaseMetadataTestCase.java      |  73 ---
 .../common/util/HBaseMiniclusterHelper.java     | 167 ------
 .../apache/kylin/common/util/HbaseImporter.java | 121 ----
 .../kylin/common/util/IdentityUtilTest.java     |   9 +-
 .../kylin/common/util/RandomSamplerTest.java    |  11 +-
 .../org/apache/kylin/common/util/RangeTest.java |   1 +
 .../apache/kylin/common/util/SSHClientTest.java |   6 +-
 core-cube/pom.xml                               |   4 +-
 .../java/org/apache/kylin/cube/CubeManager.java |   8 +-
 .../kylin/cube/cli/DictionaryGeneratorCLI.java  |  13 +-
 core-dictionary/pom.xml                         |   4 +-
 .../apache/kylin/dict/DictionaryManager.java    |  74 +--
 .../dict/DistinctColumnValuesProvider.java      |  17 +
 .../org/apache/kylin/dict/lookup/FileTable.java | 100 ----
 .../kylin/dict/lookup/FileTableReader.java      | 220 -------
 .../org/apache/kylin/dict/LookupTableTest.java  |  76 ---
 .../org/apache/kylin/dict/TableReaderTest.java  |  48 --
 .../java/org/apache/kylin/job/lock/JobLock.java |   9 +
 .../org/apache/kylin/job/lock/MockJobLock.java  |  15 +
 core-metadata/pom.xml                           |   5 +-
 engine-mr/pom.xml                               |  40 ++
 .../apache/kylin/engine/mr/DFSFileTable.java    |  99 ++++
 .../kylin/engine/mr/DFSFileTableReader.java     | 219 +++++++
 .../org/apache/kylin/engine/mr/HadoopUtil.java  | 109 ++++
 .../org/apache/kylin/engine/mr/KylinMapper.java |  32 +
 .../apache/kylin/engine/mr/KylinReducer.java    |  30 +
 .../apache/kylin/engine/mr/LookupTableTest.java |  74 +++
 .../apache/kylin/engine/mr/TableReaderTest.java |  46 ++
 invertedindex/pom.xml                           |  53 +-
 .../apache/kylin/invertedindex/IIInstance.java  |  24 +-
 .../apache/kylin/invertedindex/IIManager.java   |   6 +-
 .../apache/kylin/invertedindex/tools/IICLI.java | 108 ----
 job/dependency-reduced-pom.xml                  | 594 -------------------
 job/pom.xml                                     |  80 +--
 .../engine/mr/steps/InMemCuboidMapper.java      |   2 +-
 .../engine/mr/steps/InMemCuboidReducer.java     |   2 +-
 .../mr/steps/MergeCuboidFromStorageMapper.java  |   2 +-
 .../engine/mr/steps/MergeStatisticsStep.java    |   3 +-
 .../engine/mr/steps/SaveStatisticsStep.java     |   2 +-
 .../java/org/apache/kylin/job/Scheduler.java    |   2 +-
 .../apache/kylin/job/common/HqlExecutable.java  |   3 +-
 .../kylin/job/hadoop/AbstractHadoopJob.java     |   2 +-
 .../cardinality/ColumnCardinalityMapper.java    |   2 +-
 .../cardinality/ColumnCardinalityReducer.java   |   3 +-
 .../job/hadoop/cube/BaseCuboidMapperBase.java   |   3 +-
 .../kylin/job/hadoop/cube/CubeHFileMapper.java  |   2 +-
 .../kylin/job/hadoop/cube/CuboidReducer.java    |   3 +-
 .../cube/FactDistinctColumnsCombiner.java       |   3 +-
 .../cube/FactDistinctColumnsMapperBase.java     |   2 +-
 .../hadoop/cube/FactDistinctColumnsReducer.java |   3 +-
 .../job/hadoop/cube/MergeCuboidMapper.java      |   3 +-
 .../kylin/job/hadoop/cube/NDCuboidMapper.java   |   4 +-
 .../job/hadoop/cube/NewBaseCuboidMapper.java    |   2 +-
 .../hadoop/cube/RangeKeyDistributionMapper.java |   2 +-
 .../cube/RangeKeyDistributionReducer.java       |   3 +-
 .../cube/RowKeyDistributionCheckerMapper.java   |   3 +-
 .../cube/RowKeyDistributionCheckerReducer.java  |   3 +-
 .../job/hadoop/dict/CreateDictionaryJob.java    |  20 +-
 .../dict/CreateInvertedIndexDictionaryJob.java  |  19 +-
 .../kylin/job/hadoop/hbase/CreateHTableJob.java |   2 +-
 .../job/hadoop/invertedindex/IIBulkLoadJob.java |  16 +-
 .../hadoop/invertedindex/IICreateHFileJob.java  |   4 +-
 .../invertedindex/IICreateHFileMapper.java      |   2 +-
 .../hadoop/invertedindex/IICreateHTableJob.java |   4 +-
 .../IIDistinctColumnsCombiner.java              |   3 +-
 .../invertedindex/IIDistinctColumnsJob.java     |   2 +-
 .../invertedindex/IIDistinctColumnsMapper.java  |   3 +-
 .../invertedindex/IIDistinctColumnsReducer.java |   3 +-
 .../hadoop/invertedindex/InvertedIndexJob.java  |   2 +-
 .../invertedindex/InvertedIndexMapper.java      |   2 +-
 .../invertedindex/InvertedIndexReducer.java     |   2 +-
 .../RandomKeyDistributionMapper.java            |   3 +-
 .../RandomKeyDistributionReducer.java           |   3 +-
 .../job/impl/threadpool/DefaultScheduler.java   |   3 +-
 .../kylin/job/streaming/CubeStreamConsumer.java |   4 +-
 .../kylin/job/streaming/StreamingBootstrap.java |   5 +-
 .../kylin/job/tools/DeployCoprocessorCLI.java   |   5 +-
 .../job/tools/GridTableHBaseBenchmark.java      |   2 +-
 .../java/org/apache/kylin/job/tools/IICLI.java  | 108 ++++
 .../org/apache/kylin/job/tools/TarGZUtil.java   |  69 +++
 .../apache/kylin/source/hive/HiveMRInput.java   |   2 +-
 .../source/hive/HiveSourceTableLoader.java      |   4 +-
 .../org/apache/kylin/source/hive/HiveTable.java |   5 +-
 .../kylin/storage/hbase/HBaseMROutput2.java     |   2 +-
 .../kylin/job/BuildCubeWithEngineTest.java      |   4 +-
 .../kylin/job/BuildCubeWithStreamTest.java      |   2 +-
 .../apache/kylin/job/BuildIIWithEngineTest.java |   5 +-
 .../apache/kylin/job/BuildIIWithStreamTest.java |   2 +-
 .../kylin/job/DeployLocalMetaToRemoteTest.java  |   2 +-
 .../java/org/apache/kylin/job/DeployUtil.java   |   3 +-
 .../org/apache/kylin/job/ExportHBaseData.java   |   5 +-
 .../job/ITKafkaBasedIIStreamBuilderTest.java    |   2 +-
 .../org/apache/kylin/job/ImportHBaseData.java   |   9 +-
 .../cubev2/FactDistinctColumnsReducerTest.java  |   3 +-
 .../kylin/job/hadoop/hdfs/ITHdfsOpsTest.java    |   2 +-
 .../job/impl/threadpool/BaseSchedulerTest.java  |   4 +-
 .../job/inmemcubing/InMemCubeBuilderTest.java   |   6 +-
 .../job/streaming/CubeStreamConsumerTest.java   |   3 +-
 .../hive/ITHiveSourceTableLoaderTest.java       |   2 +-
 .../source/hive/ITHiveTableReaderTest.java      |   2 +-
 .../source/hive/ITSnapshotManagerTest.java      |   2 +-
 query/pom.xml                                   |  21 +-
 .../kylin/query/test/ITKylinQueryTest.java      |   2 +-
 server/pom.xml                                  |  25 +-
 .../kylin/rest/controller/JobController.java    |   2 +-
 .../rest/security/RealAclHBaseStorage.java      |   2 +-
 .../apache/kylin/rest/service/CubeService.java  |   8 +-
 .../apache/kylin/rest/service/QueryService.java |   2 +-
 .../org/apache/kylin/jdbc/ITJDBCDriverTest.java |   2 +-
 source-hive/pom.xml                             |  11 +
 .../apache/kylin/source/hive/HiveClient.java    | 162 +++++
 storage-hbase/pom.xml                           |  38 ++
 .../kylin/storage/hbase/HBaseConnection.java    | 234 ++++++++
 .../hbase/HBaseRegionSizeCalculator.java        | 128 ++++
 .../kylin/storage/hbase/HBaseResourceStore.java | 326 ++++++++++
 .../kylin/storage/hbase/ZookeeperJobLock.java   |  83 +++
 .../storage/hbase/HBaseMetadataTestCase.java    |  75 +++
 .../storage/hbase/HBaseMiniclusterHelper.java   | 167 ++++++
 .../kylin/storage/hbase/HbaseImporter.java      | 121 ++++
 .../storage/hbase/ITHBaseResourceStoreTest.java | 211 +++++++
 storage/pom.xml                                 |  30 +-
 .../storage/cube/CubeHBaseReadonlyStore.java    |   2 +-
 .../gridtable/diskstore/HadoopFileSystem.java   |   2 +-
 .../kylin/storage/hbase/CubeStorageQuery.java   |   2 +-
 .../hbase/InvertedIndexStorageQuery.java        |   2 +-
 .../kylin/storage/hbase/PingHBaseCLI.java       |   6 +-
 .../storage/hbase/ITInvertedIndexHBaseTest.java |  10 +-
 .../kylin/storage/test/ITStorageTest.java       |   2 +-
 streaming/pom.xml                               |  45 +-
 .../invertedindex/IIStreamConsumer.java         |   3 +-
 .../streaming/invertedindex/SliceBuilder.java   |  45 +-
 150 files changed, 2924 insertions(+), 3844 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/core-common/pom.xml
----------------------------------------------------------------------
diff --git a/core-common/pom.xml b/core-common/pom.xml
index c71a4ad..adb189e 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -60,6 +60,10 @@
             <artifactId>commons-email</artifactId>
         </dependency>
         <dependency>
+            <groupId>commons-httpclient</groupId>
+            <artifactId>commons-httpclient</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
@@ -76,60 +80,30 @@
             <artifactId>compress-lzf</artifactId>
         </dependency>
 
-        <!-- Env & Test -->
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <scope>provided</scope>
-        </dependency>
+		<!-- Logging -->
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-hdfs</artifactId>
-            <scope>provided</scope>
-            <!-- protobuf version conflict with hbase -->
-            <exclusions>
-                <exclusion>
-                    <groupId>com.google.protobuf</groupId>
-                    <artifactId>protobuf-java</artifactId>
-                </exclusion>
-            </exclusions>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-common</artifactId>
-            <scope>provided</scope>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-client</artifactId>
-            <scope>provided</scope>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-server</artifactId>
-            <scope>provided</scope>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
         </dependency>
+
+        <!-- Env & Test -->
         <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-testing-util</artifactId>
-            <version>${hbase-hadoop2.version}</version>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
             <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>javax.servlet</groupId>
-                    <artifactId>servlet-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>javax.servlet.jsp</groupId>
-                    <artifactId>jsp-api</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-         <dependency>
-            <groupId>org.apache.hive.hcatalog</groupId>
-            <artifactId>hive-hcatalog-core</artifactId>
-            <version>${hive-hcatalog.version}</version>
-            <scope>provided</scope>
-          </dependency>
+        </dependency>
+        
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
deleted file mode 100644
index 7fdb64c..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package org.apache.kylin.common.lock;
-
-/**
- */
-public interface JobLock {
-    boolean lock();
-
-    void unlock();
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
deleted file mode 100644
index 230d4d8..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.apache.kylin.common.lock;
-
-/**
- */
-public class MockJobLock implements JobLock {
-    @Override
-    public boolean lock() {
-        return true;
-    }
-
-    @Override
-    public void unlock() {
-        return;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/core-common/src/main/java/org/apache/kylin/common/lock/ZookeeperJobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/ZookeeperJobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/ZookeeperJobLock.java
deleted file mode 100644
index 603894c..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/lock/ZookeeperJobLock.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.kylin.common.lock;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- */
-public class ZookeeperJobLock implements JobLock {
-    private Logger logger = LoggerFactory.getLogger(ZookeeperJobLock.class);
-
-    private static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
-
-    private String scheduleID;
-    private InterProcessMutex sharedLock;
-    private CuratorFramework zkClient;
-
-    @Override
-    public boolean lock() {
-        this.scheduleID = schedulerId();
-        String ZKConnectString = getZKConnectString();
-        if (StringUtils.isEmpty(ZKConnectString)) {
-            throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
-        }
-
-        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
-        this.zkClient = CuratorFrameworkFactory.newClient(ZKConnectString, retryPolicy);
-        this.zkClient.start();
-        this.sharedLock = new InterProcessMutex(zkClient, this.scheduleID);
-        boolean hasLock = false;
-        try {
-            hasLock = sharedLock.acquire(3, TimeUnit.SECONDS);
-        } catch (Exception e) {
-            logger.warn("error acquire lock", e);
-        }
-        if (!hasLock) {
-            logger.warn("fail to acquire lock, scheduler has not been started");
-            zkClient.close();
-            return false;
-        }
-        return true;
-    }
-
-    @Override
-    public void unlock() {
-        releaseLock();
-    }
-
-    private String getZKConnectString() {
-        Configuration conf = HadoopUtil.newHBaseConfiguration(KylinConfig.getInstanceFromEnv().getStorageUrl());
-        return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
-    }
-
-    private void releaseLock() {
-        try {
-            if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) {
-                // client.setData().forPath(ZOOKEEPER_LOCK_PATH, null);
-                if (zkClient.checkExists().forPath(scheduleID) != null) {
-                    zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(scheduleID);
-                }
-            }
-        } catch (Exception e) {
-            logger.error("error release lock:" + scheduleID);
-            throw new RuntimeException(e);
-        }
-    }
-
-    private String schedulerId() {
-        return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/core-common/src/main/java/org/apache/kylin/common/mr/KylinMapper.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/mr/KylinMapper.java b/core-common/src/main/java/org/apache/kylin/common/mr/KylinMapper.java
deleted file mode 100644
index fd0b337..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/mr/KylinMapper.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.mr;
-
-
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Mapper;
-
-/**
- */
-public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
-    
-    protected void bindCurrentConfiguration(Configuration conf) {
-        HadoopUtil.setCurrentConfiguration(conf);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/core-common/src/main/java/org/apache/kylin/common/mr/KylinReducer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/mr/KylinReducer.java b/core-common/src/main/java/org/apache/kylin/common/mr/KylinReducer.java
deleted file mode 100644
index bc63df7..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/mr/KylinReducer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.mr;
-
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Reducer;
-
-/**
- */
-public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
-    protected void bindCurrentConfiguration(Configuration conf) {
-        HadoopUtil.setCurrentConfiguration(conf);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/core-common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java b/core-common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
deleted file mode 100644
index a574d0b..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.persistence;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * @author yangli9
- * 
- */
-public class HBaseConnection {
-
-    private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class);
-
-    private static final Map<String, Configuration> ConfigCache = new ConcurrentHashMap<String, Configuration>();
-    private static final Map<String, HConnection> ConnPool = new ConcurrentHashMap<String, HConnection>();
-
-    static {
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                for (HConnection conn : ConnPool.values()) {
-                    try {
-                        conn.close();
-                    } catch (IOException e) {
-                        e.printStackTrace();
-                    }
-                }
-            }
-        });
-    }
-
-    // returned HConnection can be shared by multiple threads and does not require close()
-    public static HConnection get(String url) {
-        // find configuration
-        Configuration conf = ConfigCache.get(url);
-        if (conf == null) {
-            conf = HadoopUtil.newHBaseConfiguration(url);
-            ConfigCache.put(url, conf);
-        }
-
-        HConnection connection = ConnPool.get(url);
-        try {
-            while (true) {
-                // I don't use DCL since recreate a connection is not a big issue.
-                if (connection == null || connection.isClosed()) {
-                    logger.info("connection is null or closed, creating a new one");
-                    connection = HConnectionManager.createConnection(conf);
-                    ConnPool.put(url, connection);
-                }
-
-                if (connection == null || connection.isClosed()) {
-                    Thread.sleep(10000);// wait a while and retry
-                } else {
-                    break;
-                }
-            }
-
-        } catch (Throwable t) {
-            logger.error("Error when open connection " + url, t);
-            throw new StorageException("Error when open connection " + url, t);
-        }
-
-        return connection;
-    }
-
-    public static boolean tableExists(HConnection conn, String tableName) throws IOException {
-        HBaseAdmin hbase = new HBaseAdmin(conn);
-        try {
-            return hbase.tableExists(TableName.valueOf(tableName));
-        } finally {
-            hbase.close();
-        }
-    }
-
-    public static boolean tableExists(String hbaseUrl, String tableName) throws IOException {
-        return tableExists(HBaseConnection.get(hbaseUrl), tableName);
-    }
-
-    public static void createHTableIfNeeded(String hbaseUrl, String tableName, String... families) throws IOException {
-        createHTableIfNeeded(HBaseConnection.get(hbaseUrl), tableName, families);
-    }
-
-    public static void deleteTable(String hbaseUrl, String tableName) throws IOException {
-        deleteTable(HBaseConnection.get(hbaseUrl), tableName);
-    }
-
-    public static void createHTableIfNeeded(HConnection conn, String tableName, String... families) throws IOException {
-        HBaseAdmin hbase = new HBaseAdmin(conn);
-
-        try {
-            if (tableExists(conn, tableName)) {
-                logger.debug("HTable '" + tableName + "' already exists");
-                return;
-            }
-
-            logger.debug("Creating HTable '" + tableName + "'");
-
-            HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
-
-            if (null != families && families.length > 0) {
-                for (String family : families) {
-                    HColumnDescriptor fd = new HColumnDescriptor(family);
-                    fd.setInMemory(true); // metadata tables are best in memory
-                    desc.addFamily(fd);
-                }
-            }
-            hbase.createTable(desc);
-
-            logger.debug("HTable '" + tableName + "' created");
-        } finally {
-            hbase.close();
-        }
-    }
-
-    public static void deleteTable(HConnection conn, String tableName) throws IOException {
-        HBaseAdmin hbase = new HBaseAdmin(conn);
-
-        try {
-            if (!tableExists(conn, tableName)) {
-                logger.debug("HTable '" + tableName + "' does not exists");
-                return;
-            }
-
-            logger.debug("delete HTable '" + tableName + "'");
-
-            if (hbase.isTableEnabled(tableName)) {
-                hbase.disableTable(tableName);
-            }
-            hbase.deleteTable(tableName);
-
-            logger.debug("HTable '" + tableName + "' deleted");
-        } finally {
-            hbase.close();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/core-common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
deleted file mode 100644
index 6c5847e..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.persistence;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.HadoopUtil;
-
-import java.io.*;
-import java.util.*;
-
-public class HBaseResourceStore extends ResourceStore {
-
-    private static final String DEFAULT_TABLE_NAME = "kylin_metadata";
-    private static final String FAMILY = "f";
-    private static final byte[] B_FAMILY = Bytes.toBytes(FAMILY);
-    private static final String COLUMN = "c";
-    private static final byte[] B_COLUMN = Bytes.toBytes(COLUMN);
-    private static final String COLUMN_TS = "t";
-    private static final byte[] B_COLUMN_TS = Bytes.toBytes(COLUMN_TS);
-
-    private static final Map<String, String> TABLE_SUFFIX_MAP = new LinkedHashMap<String, String>();
-
-    static {
-        TABLE_SUFFIX_MAP.put(CUBE_RESOURCE_ROOT + "/", "_cube");
-        TABLE_SUFFIX_MAP.put(DICT_RESOURCE_ROOT + "/", "_dict");
-        TABLE_SUFFIX_MAP.put("/invertedindex/", "_invertedindex");
-        TABLE_SUFFIX_MAP.put(JOB_PATH_ROOT + "/", "_job");
-        TABLE_SUFFIX_MAP.put(JOB_OUTPUT_PATH_ROOT + "/", "_job_output");
-        TABLE_SUFFIX_MAP.put(PROJECT_RESOURCE_ROOT + "/", "_proj");
-        TABLE_SUFFIX_MAP.put(SNAPSHOT_RESOURCE_ROOT + "/", "_table_snapshot");
-        TABLE_SUFFIX_MAP.put("", ""); // DEFAULT CASE
-    }
-
-    final String tableNameBase;
-    final String hbaseUrl;
-
-    //    final Map<String, String> tableNameMap; // path prefix ==> HBase table name
-
-    private HConnection getConnection() throws IOException {
-        return HBaseConnection.get(hbaseUrl);
-    }
-
-    public HBaseResourceStore(KylinConfig kylinConfig) throws IOException {
-        super(kylinConfig);
-
-        String metadataUrl = kylinConfig.getMetadataUrl();
-        // split TABLE@HBASE_URL
-        int cut = metadataUrl.indexOf('@');
-        tableNameBase = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut);
-        hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1);
-
-        createHTableIfNeeded(getAllInOneTableName());
-
-        //        tableNameMap = new LinkedHashMap<String, String>();
-        //        for (Entry<String, String> entry : TABLE_SUFFIX_MAP.entrySet()) {
-        //            String pathPrefix = entry.getKey();
-        //            String tableName = tableNameBase + entry.getValue();
-        //            tableNameMap.put(pathPrefix, tableName);
-        //            createHTableIfNeeded(tableName);
-        //        }
-
-    }
-
-    private void createHTableIfNeeded(String tableName) throws IOException {
-        HBaseConnection.createHTableIfNeeded(getConnection(), tableName, FAMILY);
-    }
-
-    private String getAllInOneTableName() {
-        return tableNameBase;
-    }
-
-    @Override
-    protected ArrayList<String> listResourcesImpl(String resPath) throws IOException {
-        assert resPath.startsWith("/");
-        String lookForPrefix = resPath.endsWith("/") ? resPath : resPath + "/";
-        byte[] startRow = Bytes.toBytes(lookForPrefix);
-        byte[] endRow = Bytes.toBytes(lookForPrefix);
-        endRow[endRow.length - 1]++;
-
-        ArrayList<String> result = new ArrayList<String>();
-
-        HTableInterface table = getConnection().getTable(getAllInOneTableName());
-        Scan scan = new Scan(startRow, endRow);
-        scan.setFilter(new KeyOnlyFilter());
-        try {
-            ResultScanner scanner = table.getScanner(scan);
-            for (Result r : scanner) {
-                String path = Bytes.toString(r.getRow());
-                assert path.startsWith(lookForPrefix);
-                int cut = path.indexOf('/', lookForPrefix.length());
-                String child = cut < 0 ? path : path.substring(0, cut);
-                if (result.contains(child) == false)
-                    result.add(child);
-            }
-        } finally {
-            IOUtils.closeQuietly(table);
-        }
-        // return null to indicate not a folder
-        return result.isEmpty() ? null : result;
-    }
-
-    @Override
-    protected boolean existsImpl(String resPath) throws IOException {
-        Result r = getByScan(resPath, null, null);
-        return r != null;
-    }
-
-    @Override
-    protected List<RawResource> getAllResources(String rangeStart, String rangeEnd) throws IOException {
-        byte[] startRow = Bytes.toBytes(rangeStart);
-        byte[] endRow = plusZero(Bytes.toBytes(rangeEnd));
-
-        Scan scan = new Scan(startRow, endRow);
-        scan.addColumn(B_FAMILY, B_COLUMN_TS);
-        scan.addColumn(B_FAMILY, B_COLUMN);
-
-        HTableInterface table = getConnection().getTable(getAllInOneTableName());
-        List<RawResource> result = Lists.newArrayList();
-        try {
-            ResultScanner scanner = table.getScanner(scan);
-            for (Result r : scanner) {
-                result.add(new RawResource(getInputStream(Bytes.toString(r.getRow()), r), getTimestamp(r)));
-            }
-        } catch (IOException e) {
-            for (RawResource rawResource : result) {
-                IOUtils.closeQuietly(rawResource.resource);
-            }
-            throw e;
-        } finally {
-            IOUtils.closeQuietly(table);
-        }
-        return result;
-    }
-
-    private InputStream getInputStream(String resPath, Result r) throws IOException {
-        if (r == null) {
-            return null;
-        }
-        byte[] value = r.getValue(B_FAMILY, B_COLUMN);
-        if (value.length == 0) {
-            Path redirectPath = bigCellHDFSPath(resPath);
-            Configuration hconf = HadoopUtil.getCurrentConfiguration();
-            FileSystem fileSystem = FileSystem.get(hconf);
-
-            return fileSystem.open(redirectPath);
-        } else {
-            return new ByteArrayInputStream(value);
-        }
-    }
-
-    private long getTimestamp(Result r) {
-        if (r == null) {
-            return 0;
-        } else {
-            return Bytes.toLong(r.getValue(B_FAMILY, B_COLUMN_TS));
-        }
-    }
-
-    @Override
-    protected InputStream getResourceImpl(String resPath) throws IOException {
-        Result r = getByScan(resPath, B_FAMILY, B_COLUMN);
-        return getInputStream(resPath, r);
-    }
-
-    @Override
-    protected long getResourceTimestampImpl(String resPath) throws IOException {
-        Result r = getByScan(resPath, B_FAMILY, B_COLUMN_TS);
-        return getTimestamp(r);
-    }
-
-    @Override
-    protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
-        ByteArrayOutputStream bout = new ByteArrayOutputStream();
-        IOUtils.copy(content, bout);
-        bout.close();
-
-        HTableInterface table = getConnection().getTable(getAllInOneTableName());
-        try {
-            byte[] row = Bytes.toBytes(resPath);
-            Put put = buildPut(resPath, ts, row, bout.toByteArray(), table);
-
-            table.put(put);
-            table.flushCommits();
-        } finally {
-            IOUtils.closeQuietly(table);
-        }
-    }
-
-    @Override
-    protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException {
-        HTableInterface table = getConnection().getTable(getAllInOneTableName());
-        try {
-            byte[] row = Bytes.toBytes(resPath);
-            byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS);
-            Put put = buildPut(resPath, newTS, row, content, table);
-
-            boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put);
-            if (!ok) {
-                long real = getResourceTimestamp(resPath);
-                throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real);
-            }
-
-            table.flushCommits();
-
-            return newTS;
-        } finally {
-            IOUtils.closeQuietly(table);
-        }
-    }
-
-    @Override
-    protected void deleteResourceImpl(String resPath) throws IOException {
-        HTableInterface table = getConnection().getTable(getAllInOneTableName());
-        try {
-            Delete del = new Delete(Bytes.toBytes(resPath));
-            table.delete(del);
-            table.flushCommits();
-        } finally {
-            IOUtils.closeQuietly(table);
-        }
-    }
-
-    @Override
-    protected String getReadableResourcePathImpl(String resPath) {
-        return getAllInOneTableName() + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl();
-    }
-
-    private Result getByScan(String path, byte[] family, byte[] column) throws IOException {
-        byte[] startRow = Bytes.toBytes(path);
-        byte[] endRow = plusZero(startRow);
-
-        Scan scan = new Scan(startRow, endRow);
-        if (family == null || column == null) {
-            scan.setFilter(new KeyOnlyFilter());
-        } else {
-            scan.addColumn(family, column);
-        }
-
-        HTableInterface table = getConnection().getTable(getAllInOneTableName());
-        try {
-            ResultScanner scanner = table.getScanner(scan);
-            Result result = null;
-            for (Result r : scanner) {
-                result = r;
-            }
-            return result == null || result.isEmpty() ? null : result;
-        } finally {
-            IOUtils.closeQuietly(table);
-        }
-    }
-
-    private byte[] plusZero(byte[] startRow) {
-        byte[] endRow = Arrays.copyOf(startRow, startRow.length + 1);
-        endRow[endRow.length - 1] = 0;
-        return endRow;
-    }
-
-    private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException {
-        Path redirectPath = bigCellHDFSPath(resPath);
-        Configuration hconf = HadoopUtil.getCurrentConfiguration();
-        FileSystem fileSystem = FileSystem.get(hconf);
-
-        if (fileSystem.exists(redirectPath)) {
-            fileSystem.delete(redirectPath, true);
-        }
-
-        FSDataOutputStream out = fileSystem.create(redirectPath);
-
-        try {
-            out.write(largeColumn);
-        } finally {
-            IOUtils.closeQuietly(out);
-        }
-
-        return redirectPath;
-    }
-
-    public Path bigCellHDFSPath(String resPath) {
-        String hdfsWorkingDirectory = this.kylinConfig.getHdfsWorkingDirectory();
-        Path redirectPath = new Path(hdfsWorkingDirectory, "resources" + resPath);
-        return redirectPath;
-    }
-
-    private Put buildPut(String resPath, long ts, byte[] row, byte[] content, HTableInterface table) throws IOException {
-        int kvSizeLimit = this.kylinConfig.getHBaseKeyValueSize();
-        if (content.length > kvSizeLimit) {
-            writeLargeCellToHdfs(resPath, content, table);
-            content = BytesUtil.EMPTY_BYTE_ARRAY;
-        }
-
-        Put put = new Put(row);
-        put.add(B_FAMILY, B_COLUMN, content);
-        put.add(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts));
-
-        return put;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 926b729..ed3a6a5 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,8 +63,12 @@ abstract public class ResourceStore {
     public static final ArrayList<Class<? extends ResourceStore>> knownImpl = new ArrayList<Class<? extends ResourceStore>>();
 
     static {
-        knownImpl.add(HBaseResourceStore.class);
-        knownImpl.add(FileResourceStore.class);
+        try {
+            knownImpl.add(ClassUtil.forName("org.apache.kylin.storage.hbase.HBaseResourceStore", ResourceStore.class));
+            knownImpl.add(FileResourceStore.class);
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     public static ResourceStore getStore(KylinConfig kylinConfig) {
@@ -99,9 +104,9 @@ abstract public class ResourceStore {
 
     // ============================================================================
 
-    KylinConfig kylinConfig;
+    final protected KylinConfig kylinConfig;
 
-    ResourceStore(KylinConfig kylinConfig) {
+    public ResourceStore(KylinConfig kylinConfig) {
         this.kylinConfig = kylinConfig;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index 743acfe..5e3a39f 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.common.restclient;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -31,8 +32,7 @@ import org.apache.commons.httpclient.auth.AuthScope;
 import org.apache.commons.httpclient.methods.GetMethod;
 import org.apache.commons.httpclient.methods.PutMethod;
 import org.apache.kylin.common.util.Bytes;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
+import org.apache.kylin.common.util.JsonUtil;
 
 /**
  * @author yangli9
@@ -111,16 +111,14 @@ public class RestClient {
         try {
             int code = client.executeMethod(request);
             String msg = Bytes.toString(request.getResponseBody());
-            JSONObject obj = new JSONObject(msg);
-            msg = obj.getString("config");
+            Map<String, String> map = JsonUtil.readValueAsMap(msg);
+            msg = map.get("config");
 
             if (code != 200)
                 throw new IOException("Invalid response " + code + " with cache wipe url " + url + "\n" + msg);
 
             return msg;
-
-        } catch (JSONException e) {
-            throw new IOException("Error when parsing json response from REST");
+            
         } finally {
             request.releaseConnection();
         }