You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/22 12:50:31 UTC
[6/6] incubator-kylin git commit: KYLIN-875 Refactor core-common,
remove dependency on hadoop/hbase
KYLIN-875 Refactor core-common, remove dependency on hadoop/hbase
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d2456215
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d2456215
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d2456215
Branch: refs/heads/0.8
Commit: d2456215cc881358025561d0375e787fb960daf1
Parents: dd5f2b2
Author: Li, Yang <ya...@ebay.com>
Authored: Wed Jul 22 18:49:06 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Wed Jul 22 18:49:46 2015 +0800
----------------------------------------------------------------------
core-common/pom.xml | 64 +-
.../org/apache/kylin/common/lock/JobLock.java | 9 -
.../apache/kylin/common/lock/MockJobLock.java | 15 -
.../kylin/common/lock/ZookeeperJobLock.java | 83 ---
.../org/apache/kylin/common/mr/KylinMapper.java | 33 --
.../apache/kylin/common/mr/KylinReducer.java | 31 -
.../common/persistence/HBaseConnection.java | 167 ------
.../common/persistence/HBaseResourceStore.java | 323 ----------
.../kylin/common/persistence/ResourceStore.java | 13 +-
.../kylin/common/restclient/RestClient.java | 12 +-
.../org/apache/kylin/common/util/Bytes.java | 557 ++++-------------
.../apache/kylin/common/util/BytesSplitter.java | 8 +-
.../org/apache/kylin/common/util/BytesUtil.java | 26 +-
.../common/util/HBaseRegionSizeCalculator.java | 128 ----
.../apache/kylin/common/util/HadoopUtil.java | 162 -----
.../apache/kylin/common/util/HiveClient.java | 162 -----
.../org/apache/kylin/common/util/JsonUtil.java | 18 +-
.../org/apache/kylin/common/util/TarGZUtil.java | 69 ---
.../persistence/ITHBaseResourceStoreTest.java | 208 -------
.../common/util/HBaseMetadataTestCase.java | 73 ---
.../common/util/HBaseMiniclusterHelper.java | 167 ------
.../apache/kylin/common/util/HbaseImporter.java | 121 ----
.../kylin/common/util/IdentityUtilTest.java | 9 +-
.../kylin/common/util/RandomSamplerTest.java | 11 +-
.../org/apache/kylin/common/util/RangeTest.java | 1 +
.../apache/kylin/common/util/SSHClientTest.java | 6 +-
core-cube/pom.xml | 4 +-
.../java/org/apache/kylin/cube/CubeManager.java | 8 +-
.../kylin/cube/cli/DictionaryGeneratorCLI.java | 13 +-
core-dictionary/pom.xml | 4 +-
.../apache/kylin/dict/DictionaryManager.java | 74 +--
.../dict/DistinctColumnValuesProvider.java | 17 +
.../org/apache/kylin/dict/lookup/FileTable.java | 100 ----
.../kylin/dict/lookup/FileTableReader.java | 220 -------
.../org/apache/kylin/dict/LookupTableTest.java | 76 ---
.../org/apache/kylin/dict/TableReaderTest.java | 48 --
.../java/org/apache/kylin/job/lock/JobLock.java | 9 +
.../org/apache/kylin/job/lock/MockJobLock.java | 15 +
core-metadata/pom.xml | 5 +-
engine-mr/pom.xml | 40 ++
.../apache/kylin/engine/mr/DFSFileTable.java | 99 ++++
.../kylin/engine/mr/DFSFileTableReader.java | 219 +++++++
.../org/apache/kylin/engine/mr/HadoopUtil.java | 109 ++++
.../org/apache/kylin/engine/mr/KylinMapper.java | 32 +
.../apache/kylin/engine/mr/KylinReducer.java | 30 +
.../apache/kylin/engine/mr/LookupTableTest.java | 74 +++
.../apache/kylin/engine/mr/TableReaderTest.java | 46 ++
invertedindex/pom.xml | 53 +-
.../apache/kylin/invertedindex/IIInstance.java | 24 +-
.../apache/kylin/invertedindex/IIManager.java | 6 +-
.../apache/kylin/invertedindex/tools/IICLI.java | 108 ----
job/dependency-reduced-pom.xml | 594 -------------------
job/pom.xml | 80 +--
.../engine/mr/steps/InMemCuboidMapper.java | 2 +-
.../engine/mr/steps/InMemCuboidReducer.java | 2 +-
.../mr/steps/MergeCuboidFromStorageMapper.java | 2 +-
.../engine/mr/steps/MergeStatisticsStep.java | 3 +-
.../engine/mr/steps/SaveStatisticsStep.java | 2 +-
.../java/org/apache/kylin/job/Scheduler.java | 2 +-
.../apache/kylin/job/common/HqlExecutable.java | 3 +-
.../kylin/job/hadoop/AbstractHadoopJob.java | 2 +-
.../cardinality/ColumnCardinalityMapper.java | 2 +-
.../cardinality/ColumnCardinalityReducer.java | 3 +-
.../job/hadoop/cube/BaseCuboidMapperBase.java | 3 +-
.../kylin/job/hadoop/cube/CubeHFileMapper.java | 2 +-
.../kylin/job/hadoop/cube/CuboidReducer.java | 3 +-
.../cube/FactDistinctColumnsCombiner.java | 3 +-
.../cube/FactDistinctColumnsMapperBase.java | 2 +-
.../hadoop/cube/FactDistinctColumnsReducer.java | 3 +-
.../job/hadoop/cube/MergeCuboidMapper.java | 3 +-
.../kylin/job/hadoop/cube/NDCuboidMapper.java | 4 +-
.../job/hadoop/cube/NewBaseCuboidMapper.java | 2 +-
.../hadoop/cube/RangeKeyDistributionMapper.java | 2 +-
.../cube/RangeKeyDistributionReducer.java | 3 +-
.../cube/RowKeyDistributionCheckerMapper.java | 3 +-
.../cube/RowKeyDistributionCheckerReducer.java | 3 +-
.../job/hadoop/dict/CreateDictionaryJob.java | 20 +-
.../dict/CreateInvertedIndexDictionaryJob.java | 19 +-
.../kylin/job/hadoop/hbase/CreateHTableJob.java | 2 +-
.../job/hadoop/invertedindex/IIBulkLoadJob.java | 16 +-
.../hadoop/invertedindex/IICreateHFileJob.java | 4 +-
.../invertedindex/IICreateHFileMapper.java | 2 +-
.../hadoop/invertedindex/IICreateHTableJob.java | 4 +-
.../IIDistinctColumnsCombiner.java | 3 +-
.../invertedindex/IIDistinctColumnsJob.java | 2 +-
.../invertedindex/IIDistinctColumnsMapper.java | 3 +-
.../invertedindex/IIDistinctColumnsReducer.java | 3 +-
.../hadoop/invertedindex/InvertedIndexJob.java | 2 +-
.../invertedindex/InvertedIndexMapper.java | 2 +-
.../invertedindex/InvertedIndexReducer.java | 2 +-
.../RandomKeyDistributionMapper.java | 3 +-
.../RandomKeyDistributionReducer.java | 3 +-
.../job/impl/threadpool/DefaultScheduler.java | 3 +-
.../kylin/job/streaming/CubeStreamConsumer.java | 4 +-
.../kylin/job/streaming/StreamingBootstrap.java | 5 +-
.../kylin/job/tools/DeployCoprocessorCLI.java | 5 +-
.../job/tools/GridTableHBaseBenchmark.java | 2 +-
.../java/org/apache/kylin/job/tools/IICLI.java | 108 ++++
.../org/apache/kylin/job/tools/TarGZUtil.java | 69 +++
.../apache/kylin/source/hive/HiveMRInput.java | 2 +-
.../source/hive/HiveSourceTableLoader.java | 4 +-
.../org/apache/kylin/source/hive/HiveTable.java | 5 +-
.../kylin/storage/hbase/HBaseMROutput2.java | 2 +-
.../kylin/job/BuildCubeWithEngineTest.java | 4 +-
.../kylin/job/BuildCubeWithStreamTest.java | 2 +-
.../apache/kylin/job/BuildIIWithEngineTest.java | 5 +-
.../apache/kylin/job/BuildIIWithStreamTest.java | 2 +-
.../kylin/job/DeployLocalMetaToRemoteTest.java | 2 +-
.../java/org/apache/kylin/job/DeployUtil.java | 3 +-
.../org/apache/kylin/job/ExportHBaseData.java | 5 +-
.../job/ITKafkaBasedIIStreamBuilderTest.java | 2 +-
.../org/apache/kylin/job/ImportHBaseData.java | 9 +-
.../cubev2/FactDistinctColumnsReducerTest.java | 3 +-
.../kylin/job/hadoop/hdfs/ITHdfsOpsTest.java | 2 +-
.../job/impl/threadpool/BaseSchedulerTest.java | 4 +-
.../job/inmemcubing/InMemCubeBuilderTest.java | 6 +-
.../job/streaming/CubeStreamConsumerTest.java | 3 +-
.../hive/ITHiveSourceTableLoaderTest.java | 2 +-
.../source/hive/ITHiveTableReaderTest.java | 2 +-
.../source/hive/ITSnapshotManagerTest.java | 2 +-
query/pom.xml | 21 +-
.../kylin/query/test/ITKylinQueryTest.java | 2 +-
server/pom.xml | 25 +-
.../kylin/rest/controller/JobController.java | 2 +-
.../rest/security/RealAclHBaseStorage.java | 2 +-
.../apache/kylin/rest/service/CubeService.java | 8 +-
.../apache/kylin/rest/service/QueryService.java | 2 +-
.../org/apache/kylin/jdbc/ITJDBCDriverTest.java | 2 +-
source-hive/pom.xml | 11 +
.../apache/kylin/source/hive/HiveClient.java | 162 +++++
storage-hbase/pom.xml | 38 ++
.../kylin/storage/hbase/HBaseConnection.java | 234 ++++++++
.../hbase/HBaseRegionSizeCalculator.java | 128 ++++
.../kylin/storage/hbase/HBaseResourceStore.java | 326 ++++++++++
.../kylin/storage/hbase/ZookeeperJobLock.java | 83 +++
.../storage/hbase/HBaseMetadataTestCase.java | 75 +++
.../storage/hbase/HBaseMiniclusterHelper.java | 167 ++++++
.../kylin/storage/hbase/HbaseImporter.java | 121 ++++
.../storage/hbase/ITHBaseResourceStoreTest.java | 211 +++++++
storage/pom.xml | 30 +-
.../storage/cube/CubeHBaseReadonlyStore.java | 2 +-
.../gridtable/diskstore/HadoopFileSystem.java | 2 +-
.../kylin/storage/hbase/CubeStorageQuery.java | 2 +-
.../hbase/InvertedIndexStorageQuery.java | 2 +-
.../kylin/storage/hbase/PingHBaseCLI.java | 6 +-
.../storage/hbase/ITInvertedIndexHBaseTest.java | 10 +-
.../kylin/storage/test/ITStorageTest.java | 2 +-
streaming/pom.xml | 45 +-
.../invertedindex/IIStreamConsumer.java | 3 +-
.../streaming/invertedindex/SliceBuilder.java | 45 +-
150 files changed, 2924 insertions(+), 3844 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/core-common/pom.xml
----------------------------------------------------------------------
diff --git a/core-common/pom.xml b/core-common/pom.xml
index c71a4ad..adb189e 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -60,6 +60,10 @@
<artifactId>commons-email</artifactId>
</dependency>
<dependency>
+ <groupId>commons-httpclient</groupId>
+ <artifactId>commons-httpclient</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
@@ -76,60 +80,30 @@
<artifactId>compress-lzf</artifactId>
</dependency>
- <!-- Env & Test -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <scope>provided</scope>
- </dependency>
+ <!-- Logging -->
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <scope>provided</scope>
- <!-- protobuf version conflict with hbase -->
- <exclusions>
- <exclusion>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </exclusion>
- </exclusions>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-common</artifactId>
- <scope>provided</scope>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <scope>provided</scope>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-server</artifactId>
- <scope>provided</scope>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
</dependency>
+
+ <!-- Env & Test -->
<dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-testing-util</artifactId>
- <version>${hbase-hadoop2.version}</version>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>javax.servlet</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.servlet.jsp</groupId>
- <artifactId>jsp-api</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hive.hcatalog</groupId>
- <artifactId>hive-hcatalog-core</artifactId>
- <version>${hive-hcatalog.version}</version>
- <scope>provided</scope>
- </dependency>
+ </dependency>
+
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
deleted file mode 100644
index 7fdb64c..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package org.apache.kylin.common.lock;
-
-/**
- */
-public interface JobLock {
- boolean lock();
-
- void unlock();
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
deleted file mode 100644
index 230d4d8..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.apache.kylin.common.lock;
-
-/**
- */
-public class MockJobLock implements JobLock {
- @Override
- public boolean lock() {
- return true;
- }
-
- @Override
- public void unlock() {
- return;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/core-common/src/main/java/org/apache/kylin/common/lock/ZookeeperJobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/ZookeeperJobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/ZookeeperJobLock.java
deleted file mode 100644
index 603894c..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/lock/ZookeeperJobLock.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.kylin.common.lock;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- */
-public class ZookeeperJobLock implements JobLock {
- private Logger logger = LoggerFactory.getLogger(ZookeeperJobLock.class);
-
- private static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
-
- private String scheduleID;
- private InterProcessMutex sharedLock;
- private CuratorFramework zkClient;
-
- @Override
- public boolean lock() {
- this.scheduleID = schedulerId();
- String ZKConnectString = getZKConnectString();
- if (StringUtils.isEmpty(ZKConnectString)) {
- throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
- }
-
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
- this.zkClient = CuratorFrameworkFactory.newClient(ZKConnectString, retryPolicy);
- this.zkClient.start();
- this.sharedLock = new InterProcessMutex(zkClient, this.scheduleID);
- boolean hasLock = false;
- try {
- hasLock = sharedLock.acquire(3, TimeUnit.SECONDS);
- } catch (Exception e) {
- logger.warn("error acquire lock", e);
- }
- if (!hasLock) {
- logger.warn("fail to acquire lock, scheduler has not been started");
- zkClient.close();
- return false;
- }
- return true;
- }
-
- @Override
- public void unlock() {
- releaseLock();
- }
-
- private String getZKConnectString() {
- Configuration conf = HadoopUtil.newHBaseConfiguration(KylinConfig.getInstanceFromEnv().getStorageUrl());
- return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
- }
-
- private void releaseLock() {
- try {
- if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) {
- // client.setData().forPath(ZOOKEEPER_LOCK_PATH, null);
- if (zkClient.checkExists().forPath(scheduleID) != null) {
- zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(scheduleID);
- }
- }
- } catch (Exception e) {
- logger.error("error release lock:" + scheduleID);
- throw new RuntimeException(e);
- }
- }
-
- private String schedulerId() {
- return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/core-common/src/main/java/org/apache/kylin/common/mr/KylinMapper.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/mr/KylinMapper.java b/core-common/src/main/java/org/apache/kylin/common/mr/KylinMapper.java
deleted file mode 100644
index fd0b337..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/mr/KylinMapper.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.mr;
-
-
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Mapper;
-
-/**
- */
-public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
-
- protected void bindCurrentConfiguration(Configuration conf) {
- HadoopUtil.setCurrentConfiguration(conf);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/core-common/src/main/java/org/apache/kylin/common/mr/KylinReducer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/mr/KylinReducer.java b/core-common/src/main/java/org/apache/kylin/common/mr/KylinReducer.java
deleted file mode 100644
index bc63df7..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/mr/KylinReducer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.mr;
-
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Reducer;
-
-/**
- */
-public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
- protected void bindCurrentConfiguration(Configuration conf) {
- HadoopUtil.setCurrentConfiguration(conf);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/core-common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java b/core-common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
deleted file mode 100644
index a574d0b..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.persistence;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * @author yangli9
- *
- */
-public class HBaseConnection {
-
- private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class);
-
- private static final Map<String, Configuration> ConfigCache = new ConcurrentHashMap<String, Configuration>();
- private static final Map<String, HConnection> ConnPool = new ConcurrentHashMap<String, HConnection>();
-
- static {
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- for (HConnection conn : ConnPool.values()) {
- try {
- conn.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- });
- }
-
- // returned HConnection can be shared by multiple threads and does not require close()
- public static HConnection get(String url) {
- // find configuration
- Configuration conf = ConfigCache.get(url);
- if (conf == null) {
- conf = HadoopUtil.newHBaseConfiguration(url);
- ConfigCache.put(url, conf);
- }
-
- HConnection connection = ConnPool.get(url);
- try {
- while (true) {
- // I don't use DCL since recreate a connection is not a big issue.
- if (connection == null || connection.isClosed()) {
- logger.info("connection is null or closed, creating a new one");
- connection = HConnectionManager.createConnection(conf);
- ConnPool.put(url, connection);
- }
-
- if (connection == null || connection.isClosed()) {
- Thread.sleep(10000);// wait a while and retry
- } else {
- break;
- }
- }
-
- } catch (Throwable t) {
- logger.error("Error when open connection " + url, t);
- throw new StorageException("Error when open connection " + url, t);
- }
-
- return connection;
- }
-
- public static boolean tableExists(HConnection conn, String tableName) throws IOException {
- HBaseAdmin hbase = new HBaseAdmin(conn);
- try {
- return hbase.tableExists(TableName.valueOf(tableName));
- } finally {
- hbase.close();
- }
- }
-
- public static boolean tableExists(String hbaseUrl, String tableName) throws IOException {
- return tableExists(HBaseConnection.get(hbaseUrl), tableName);
- }
-
- public static void createHTableIfNeeded(String hbaseUrl, String tableName, String... families) throws IOException {
- createHTableIfNeeded(HBaseConnection.get(hbaseUrl), tableName, families);
- }
-
- public static void deleteTable(String hbaseUrl, String tableName) throws IOException {
- deleteTable(HBaseConnection.get(hbaseUrl), tableName);
- }
-
- public static void createHTableIfNeeded(HConnection conn, String tableName, String... families) throws IOException {
- HBaseAdmin hbase = new HBaseAdmin(conn);
-
- try {
- if (tableExists(conn, tableName)) {
- logger.debug("HTable '" + tableName + "' already exists");
- return;
- }
-
- logger.debug("Creating HTable '" + tableName + "'");
-
- HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
-
- if (null != families && families.length > 0) {
- for (String family : families) {
- HColumnDescriptor fd = new HColumnDescriptor(family);
- fd.setInMemory(true); // metadata tables are best in memory
- desc.addFamily(fd);
- }
- }
- hbase.createTable(desc);
-
- logger.debug("HTable '" + tableName + "' created");
- } finally {
- hbase.close();
- }
- }
-
- public static void deleteTable(HConnection conn, String tableName) throws IOException {
- HBaseAdmin hbase = new HBaseAdmin(conn);
-
- try {
- if (!tableExists(conn, tableName)) {
- logger.debug("HTable '" + tableName + "' does not exists");
- return;
- }
-
- logger.debug("delete HTable '" + tableName + "'");
-
- if (hbase.isTableEnabled(tableName)) {
- hbase.disableTable(tableName);
- }
- hbase.deleteTable(tableName);
-
- logger.debug("HTable '" + tableName + "' deleted");
- } finally {
- hbase.close();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/core-common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
deleted file mode 100644
index 6c5847e..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.persistence;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.HadoopUtil;
-
-import java.io.*;
-import java.util.*;
-
-public class HBaseResourceStore extends ResourceStore {
-
- private static final String DEFAULT_TABLE_NAME = "kylin_metadata";
- private static final String FAMILY = "f";
- private static final byte[] B_FAMILY = Bytes.toBytes(FAMILY);
- private static final String COLUMN = "c";
- private static final byte[] B_COLUMN = Bytes.toBytes(COLUMN);
- private static final String COLUMN_TS = "t";
- private static final byte[] B_COLUMN_TS = Bytes.toBytes(COLUMN_TS);
-
- private static final Map<String, String> TABLE_SUFFIX_MAP = new LinkedHashMap<String, String>();
-
- static {
- TABLE_SUFFIX_MAP.put(CUBE_RESOURCE_ROOT + "/", "_cube");
- TABLE_SUFFIX_MAP.put(DICT_RESOURCE_ROOT + "/", "_dict");
- TABLE_SUFFIX_MAP.put("/invertedindex/", "_invertedindex");
- TABLE_SUFFIX_MAP.put(JOB_PATH_ROOT + "/", "_job");
- TABLE_SUFFIX_MAP.put(JOB_OUTPUT_PATH_ROOT + "/", "_job_output");
- TABLE_SUFFIX_MAP.put(PROJECT_RESOURCE_ROOT + "/", "_proj");
- TABLE_SUFFIX_MAP.put(SNAPSHOT_RESOURCE_ROOT + "/", "_table_snapshot");
- TABLE_SUFFIX_MAP.put("", ""); // DEFAULT CASE
- }
-
- final String tableNameBase;
- final String hbaseUrl;
-
- // final Map<String, String> tableNameMap; // path prefix ==> HBase table name
-
- private HConnection getConnection() throws IOException {
- return HBaseConnection.get(hbaseUrl);
- }
-
- public HBaseResourceStore(KylinConfig kylinConfig) throws IOException {
- super(kylinConfig);
-
- String metadataUrl = kylinConfig.getMetadataUrl();
- // split TABLE@HBASE_URL
- int cut = metadataUrl.indexOf('@');
- tableNameBase = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut);
- hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1);
-
- createHTableIfNeeded(getAllInOneTableName());
-
- // tableNameMap = new LinkedHashMap<String, String>();
- // for (Entry<String, String> entry : TABLE_SUFFIX_MAP.entrySet()) {
- // String pathPrefix = entry.getKey();
- // String tableName = tableNameBase + entry.getValue();
- // tableNameMap.put(pathPrefix, tableName);
- // createHTableIfNeeded(tableName);
- // }
-
- }
-
- private void createHTableIfNeeded(String tableName) throws IOException {
- HBaseConnection.createHTableIfNeeded(getConnection(), tableName, FAMILY);
- }
-
- private String getAllInOneTableName() {
- return tableNameBase;
- }
-
- @Override
- protected ArrayList<String> listResourcesImpl(String resPath) throws IOException {
- assert resPath.startsWith("/");
- String lookForPrefix = resPath.endsWith("/") ? resPath : resPath + "/";
- byte[] startRow = Bytes.toBytes(lookForPrefix);
- byte[] endRow = Bytes.toBytes(lookForPrefix);
- endRow[endRow.length - 1]++;
-
- ArrayList<String> result = new ArrayList<String>();
-
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
- Scan scan = new Scan(startRow, endRow);
- scan.setFilter(new KeyOnlyFilter());
- try {
- ResultScanner scanner = table.getScanner(scan);
- for (Result r : scanner) {
- String path = Bytes.toString(r.getRow());
- assert path.startsWith(lookForPrefix);
- int cut = path.indexOf('/', lookForPrefix.length());
- String child = cut < 0 ? path : path.substring(0, cut);
- if (result.contains(child) == false)
- result.add(child);
- }
- } finally {
- IOUtils.closeQuietly(table);
- }
- // return null to indicate not a folder
- return result.isEmpty() ? null : result;
- }
-
- @Override
- protected boolean existsImpl(String resPath) throws IOException {
- Result r = getByScan(resPath, null, null);
- return r != null;
- }
-
- @Override
- protected List<RawResource> getAllResources(String rangeStart, String rangeEnd) throws IOException {
- byte[] startRow = Bytes.toBytes(rangeStart);
- byte[] endRow = plusZero(Bytes.toBytes(rangeEnd));
-
- Scan scan = new Scan(startRow, endRow);
- scan.addColumn(B_FAMILY, B_COLUMN_TS);
- scan.addColumn(B_FAMILY, B_COLUMN);
-
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
- List<RawResource> result = Lists.newArrayList();
- try {
- ResultScanner scanner = table.getScanner(scan);
- for (Result r : scanner) {
- result.add(new RawResource(getInputStream(Bytes.toString(r.getRow()), r), getTimestamp(r)));
- }
- } catch (IOException e) {
- for (RawResource rawResource : result) {
- IOUtils.closeQuietly(rawResource.resource);
- }
- throw e;
- } finally {
- IOUtils.closeQuietly(table);
- }
- return result;
- }
-
- private InputStream getInputStream(String resPath, Result r) throws IOException {
- if (r == null) {
- return null;
- }
- byte[] value = r.getValue(B_FAMILY, B_COLUMN);
- if (value.length == 0) {
- Path redirectPath = bigCellHDFSPath(resPath);
- Configuration hconf = HadoopUtil.getCurrentConfiguration();
- FileSystem fileSystem = FileSystem.get(hconf);
-
- return fileSystem.open(redirectPath);
- } else {
- return new ByteArrayInputStream(value);
- }
- }
-
- private long getTimestamp(Result r) {
- if (r == null) {
- return 0;
- } else {
- return Bytes.toLong(r.getValue(B_FAMILY, B_COLUMN_TS));
- }
- }
-
- @Override
- protected InputStream getResourceImpl(String resPath) throws IOException {
- Result r = getByScan(resPath, B_FAMILY, B_COLUMN);
- return getInputStream(resPath, r);
- }
-
- @Override
- protected long getResourceTimestampImpl(String resPath) throws IOException {
- Result r = getByScan(resPath, B_FAMILY, B_COLUMN_TS);
- return getTimestamp(r);
- }
-
- @Override
- protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
- ByteArrayOutputStream bout = new ByteArrayOutputStream();
- IOUtils.copy(content, bout);
- bout.close();
-
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
- try {
- byte[] row = Bytes.toBytes(resPath);
- Put put = buildPut(resPath, ts, row, bout.toByteArray(), table);
-
- table.put(put);
- table.flushCommits();
- } finally {
- IOUtils.closeQuietly(table);
- }
- }
-
- @Override
- protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException {
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
- try {
- byte[] row = Bytes.toBytes(resPath);
- byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS);
- Put put = buildPut(resPath, newTS, row, content, table);
-
- boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put);
- if (!ok) {
- long real = getResourceTimestamp(resPath);
- throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real);
- }
-
- table.flushCommits();
-
- return newTS;
- } finally {
- IOUtils.closeQuietly(table);
- }
- }
-
- @Override
- protected void deleteResourceImpl(String resPath) throws IOException {
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
- try {
- Delete del = new Delete(Bytes.toBytes(resPath));
- table.delete(del);
- table.flushCommits();
- } finally {
- IOUtils.closeQuietly(table);
- }
- }
-
- @Override
- protected String getReadableResourcePathImpl(String resPath) {
- return getAllInOneTableName() + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl();
- }
-
- private Result getByScan(String path, byte[] family, byte[] column) throws IOException {
- byte[] startRow = Bytes.toBytes(path);
- byte[] endRow = plusZero(startRow);
-
- Scan scan = new Scan(startRow, endRow);
- if (family == null || column == null) {
- scan.setFilter(new KeyOnlyFilter());
- } else {
- scan.addColumn(family, column);
- }
-
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
- try {
- ResultScanner scanner = table.getScanner(scan);
- Result result = null;
- for (Result r : scanner) {
- result = r;
- }
- return result == null || result.isEmpty() ? null : result;
- } finally {
- IOUtils.closeQuietly(table);
- }
- }
-
- private byte[] plusZero(byte[] startRow) {
- byte[] endRow = Arrays.copyOf(startRow, startRow.length + 1);
- endRow[endRow.length - 1] = 0;
- return endRow;
- }
-
- private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException {
- Path redirectPath = bigCellHDFSPath(resPath);
- Configuration hconf = HadoopUtil.getCurrentConfiguration();
- FileSystem fileSystem = FileSystem.get(hconf);
-
- if (fileSystem.exists(redirectPath)) {
- fileSystem.delete(redirectPath, true);
- }
-
- FSDataOutputStream out = fileSystem.create(redirectPath);
-
- try {
- out.write(largeColumn);
- } finally {
- IOUtils.closeQuietly(out);
- }
-
- return redirectPath;
- }
-
- public Path bigCellHDFSPath(String resPath) {
- String hdfsWorkingDirectory = this.kylinConfig.getHdfsWorkingDirectory();
- Path redirectPath = new Path(hdfsWorkingDirectory, "resources" + resPath);
- return redirectPath;
- }
-
- private Put buildPut(String resPath, long ts, byte[] row, byte[] content, HTableInterface table) throws IOException {
- int kvSizeLimit = this.kylinConfig.getHBaseKeyValueSize();
- if (content.length > kvSizeLimit) {
- writeLargeCellToHdfs(resPath, content, table);
- content = BytesUtil.EMPTY_BYTE_ARRAY;
- }
-
- Put put = new Put(row);
- put.add(B_FAMILY, B_COLUMN, content);
- put.add(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts));
-
- return put;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 926b729..ed3a6a5 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.IOUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,8 +63,12 @@ abstract public class ResourceStore {
public static final ArrayList<Class<? extends ResourceStore>> knownImpl = new ArrayList<Class<? extends ResourceStore>>();
static {
- knownImpl.add(HBaseResourceStore.class);
- knownImpl.add(FileResourceStore.class);
+ try {
+ knownImpl.add(ClassUtil.forName("org.apache.kylin.storage.hbase.HBaseResourceStore", ResourceStore.class));
+ knownImpl.add(FileResourceStore.class);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
}
public static ResourceStore getStore(KylinConfig kylinConfig) {
@@ -99,9 +104,9 @@ abstract public class ResourceStore {
// ============================================================================
- KylinConfig kylinConfig;
+ final protected KylinConfig kylinConfig;
- ResourceStore(KylinConfig kylinConfig) {
+ public ResourceStore(KylinConfig kylinConfig) {
this.kylinConfig = kylinConfig;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index 743acfe..5e3a39f 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -19,6 +19,7 @@
package org.apache.kylin.common.restclient;
import java.io.IOException;
+import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -31,8 +32,7 @@ import org.apache.commons.httpclient.auth.AuthScope;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.PutMethod;
import org.apache.kylin.common.util.Bytes;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
+import org.apache.kylin.common.util.JsonUtil;
/**
* @author yangli9
@@ -111,16 +111,14 @@ public class RestClient {
try {
int code = client.executeMethod(request);
String msg = Bytes.toString(request.getResponseBody());
- JSONObject obj = new JSONObject(msg);
- msg = obj.getString("config");
+ Map<String, String> map = JsonUtil.readValueAsMap(msg);
+ msg = map.get("config");
if (code != 200)
throw new IOException("Invalid response " + code + " with cache wipe url " + url + "\n" + msg);
return msg;
-
- } catch (JSONException e) {
- throw new IOException("Error when parsing json response from REST");
+
} finally {
request.releaseConnection();
}