You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/22 12:50:27 UTC
[2/6] incubator-kylin git commit: KYLIN-875 Refactor core-common,
remove dependency on hadoop/hbase
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/job/src/main/java/org/apache/kylin/job/tools/IICLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/IICLI.java b/job/src/main/java/org/apache/kylin/job/tools/IICLI.java
new file mode 100644
index 0000000..4e38309
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/tools/IICLI.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.tools;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.index.RawTableRecord;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.IIRow;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * @author yangli9
+ */
+public class IICLI {
+
+ public static void main(String[] args) throws IOException {
+ Configuration hconf = HadoopUtil.getCurrentConfiguration();
+ IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+ String iiName = args[0];
+ IIInstance ii = mgr.getII(iiName);
+
+ String path = args[1];
+ System.out.println("Reading from " + path + " ...");
+
+ TableRecordInfo info = new TableRecordInfo(ii.getFirstSegment());
+ IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest());
+ int count = 0;
+ for (Slice slice : codec.decodeKeyValue(readSequenceKVs(hconf, path))) {
+ for (RawTableRecord rec : slice) {
+ System.out.printf(new TableRecord(rec, info).toString());
+ count++;
+ }
+ }
+ System.out.println("Total " + count + " records");
+ }
+
+ public static Iterable<IIRow> readSequenceKVs(
+ Configuration hconf, String path) throws IOException {
+ final Reader reader = new Reader(hconf,
+ SequenceFile.Reader.file(new Path(path)));
+ return new Iterable<IIRow>() {
+ @Override
+ public Iterator<IIRow> iterator() {
+ return new Iterator<IIRow>() {
+ ImmutableBytesWritable k = new ImmutableBytesWritable();
+ ImmutableBytesWritable v = new ImmutableBytesWritable();
+ IIRow pair = new IIRow(k, v, null);
+
+ @Override
+ public boolean hasNext() {
+ boolean hasNext = false;
+ try {
+ hasNext = reader.next(k, v);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (hasNext == false) {
+ IOUtils.closeQuietly(reader);
+ }
+ }
+ return hasNext;
+ }
+
+ @Override
+ public IIRow next() {
+ return pair;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/job/src/main/java/org/apache/kylin/job/tools/TarGZUtil.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/TarGZUtil.java b/job/src/main/java/org/apache/kylin/job/tools/TarGZUtil.java
new file mode 100644
index 0000000..2ce5889
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/tools/TarGZUtil.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.tools;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+
+public class TarGZUtil {
+
+ public static void uncompressTarGZ(File tarFile, File dest) throws IOException {
+ dest.mkdir();
+ TarArchiveInputStream tarIn = null;
+
+ tarIn = new TarArchiveInputStream(new GzipCompressorInputStream(new BufferedInputStream(new FileInputStream(tarFile))));
+
+ TarArchiveEntry tarEntry = tarIn.getNextTarEntry();
+ // tarIn is a TarArchiveInputStream
+ while (tarEntry != null) {// create a file with the same name as the tarEntry
+ File destPath = new File(dest, tarEntry.getName());
+ System.out.println("working: " + destPath.getCanonicalPath());
+ if (tarEntry.isDirectory()) {
+ destPath.mkdirs();
+ } else {
+ destPath.createNewFile();
+ //byte [] btoRead = new byte[(int)tarEntry.getSize()];
+ byte[] btoRead = new byte[1024];
+ //FileInputStream fin
+ // = new FileInputStream(destPath.getCanonicalPath());
+ BufferedOutputStream bout = new BufferedOutputStream(new FileOutputStream(destPath));
+ int len = 0;
+
+ while ((len = tarIn.read(btoRead)) != -1) {
+ bout.write(btoRead, 0, len);
+ }
+
+ bout.close();
+ btoRead = null;
+
+ }
+ tarEntry = tarIn.getNextTarEntry();
+ }
+ tarIn.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index f005bc9..c40924e 100644
--- a/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -25,9 +25,9 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.job.JoinedFlatTable;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/job/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/job/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
index 76b9bab..31878a8 100644
--- a/job/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
@@ -21,11 +21,11 @@ package org.apache.kylin.source.hive;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.HiveClient;
+import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.ColumnDesc;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/job/src/main/java/org/apache/kylin/source/hive/HiveTable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveTable.java b/job/src/main/java/org/apache/kylin/source/hive/HiveTable.java
index 02e7c45..c09b6fc 100644
--- a/job/src/main/java/org/apache/kylin/source/hive/HiveTable.java
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveTable.java
@@ -21,9 +21,8 @@ package org.apache.kylin.source.hive;
import java.io.IOException;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HiveClient;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.dict.lookup.FileTable;
+import org.apache.kylin.engine.mr.DFSFileTable;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.source.ReadableTable;
import org.slf4j.Logger;
@@ -54,7 +53,7 @@ public class HiveTable implements ReadableTable {
public TableSignature getSignature() throws IOException {
try {
String path = computeHDFSLocation();
- Pair<Long, Long> sizeAndLastModified = FileTable.getSizeAndLastModified(path);
+ Pair<Long, Long> sizeAndLastModified = DFSFileTable.getSizeAndLastModified(path);
long size = sizeAndLastModified.getFirst();
long lastModified = sizeAndLastModified.getSecond();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
index cd156b0..b5f6d65 100644
--- a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
+++ b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
-import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
@@ -62,6 +61,7 @@ import org.apache.kylin.cube.kv.RowValueDecoder;
import org.apache.kylin.cube.model.HBaseColumnDesc;
import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.IMROutput2;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.metadata.measure.MeasureCodec;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index 5423ddd..1b626d2 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -34,10 +34,8 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.lock.ZookeeperJobLock;
import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -50,6 +48,8 @@ import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.storage.hbase.HBaseMetadataTestCase;
+import org.apache.kylin.storage.hbase.ZookeeperJobLock;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
index b1aad82..6a29bb8 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
@@ -38,9 +38,9 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.job.streaming.BootstrapConfig;
import org.apache.kylin.job.streaming.StreamingBootstrap;
+import org.apache.kylin.storage.hbase.HBaseMetadataTestCase;
import org.apache.kylin.streaming.StreamingConfig;
import org.apache.kylin.streaming.StreamingManager;
import org.junit.After;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
index 2382540..41457ed 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
@@ -19,14 +19,13 @@
package org.apache.kylin.job;
import com.google.common.collect.Lists;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.lock.ZookeeperJobLock;
import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
@@ -39,6 +38,8 @@ import org.apache.kylin.job.invertedindex.IIJob;
import org.apache.kylin.job.invertedindex.IIJobBuilder;
import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.storage.hbase.HBaseMetadataTestCase;
+import org.apache.kylin.storage.hbase.ZookeeperJobLock;
import org.junit.*;
import java.io.File;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index 1f91b25..071ae3a 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -55,7 +55,6 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
@@ -69,6 +68,7 @@ import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.source.hive.HiveTableReader;
+import org.apache.kylin.storage.hbase.HBaseMetadataTestCase;
import org.apache.kylin.streaming.StreamBuilder;
import org.apache.kylin.streaming.StreamMessage;
import org.apache.kylin.streaming.invertedindex.IIStreamConsumer;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/job/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java b/job/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
index 276cf67..ff23040 100644
--- a/job/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
+++ b/job/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
@@ -23,7 +23,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.storage.hbase.HBaseMetadataTestCase;
import org.junit.*;
import java.io.File;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/job/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/DeployUtil.java b/job/src/test/java/org/apache/kylin/job/DeployUtil.java
index 98927bb..7ebfac0 100644
--- a/job/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/job/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -19,6 +19,7 @@
package org.apache.kylin.job;
import com.google.common.collect.Lists;
+
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
@@ -27,7 +28,6 @@ import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.ResourceTool;
import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.CliCommandExecutor;
-import org.apache.kylin.common.util.HiveClient;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.cube.CubeInstance;
@@ -40,6 +40,7 @@ import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.hive.HiveClient;
import org.apache.kylin.streaming.StreamMessage;
import org.apache.kylin.streaming.StreamingConfig;
import org.apache.kylin.streaming.TimedJsonStreamParser;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java b/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java
index 00ecb23..e3120f3 100644
--- a/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java
+++ b/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java
@@ -25,14 +25,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.kylin.common.util.HBaseMiniclusterHelper;
import org.codehaus.plexus.util.FileUtils;
-
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.SSHClient;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.HBaseMiniclusterHelper;
public class ExportHBaseData {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
index 71c9644..17d907c 100644
--- a/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
@@ -37,8 +37,8 @@ package org.apache.kylin.job;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.job.streaming.StreamingBootstrap;
+import org.apache.kylin.storage.hbase.HBaseMetadataTestCase;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/job/src/test/java/org/apache/kylin/job/ImportHBaseData.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/ImportHBaseData.java b/job/src/test/java/org/apache/kylin/job/ImportHBaseData.java
index 0de026a..8aea870 100644
--- a/job/src/test/java/org/apache/kylin/job/ImportHBaseData.java
+++ b/job/src/test/java/org/apache/kylin/job/ImportHBaseData.java
@@ -27,15 +27,14 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
-
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.HBaseConnection;
-import org.apache.kylin.common.persistence.HBaseResourceStore;
import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.CliCommandExecutor;
-import org.apache.kylin.common.util.HBaseMiniclusterHelper;
import org.apache.kylin.common.util.SSHClient;
-import org.apache.kylin.common.util.TarGZUtil;
+import org.apache.kylin.job.tools.TarGZUtil;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.HBaseMiniclusterHelper;
+import org.apache.kylin.storage.hbase.HBaseResourceStore;
public class ImportHBaseData {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/FactDistinctColumnsReducerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/FactDistinctColumnsReducerTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/FactDistinctColumnsReducerTest.java
index 2e2a945..b85afcc 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/FactDistinctColumnsReducerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/FactDistinctColumnsReducerTest.java
@@ -1,11 +1,12 @@
package org.apache.kylin.job.hadoop.cubev2;
import com.google.common.collect.Maps;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/job/src/test/java/org/apache/kylin/job/hadoop/hdfs/ITHdfsOpsTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/hdfs/ITHdfsOpsTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/hdfs/ITHdfsOpsTest.java
index 8710681..69519e0 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/hdfs/ITHdfsOpsTest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/hdfs/ITHdfsOpsTest.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.storage.hbase.HBaseMetadataTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index 29b8304..20208f2 100644
--- a/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -19,14 +19,14 @@
package org.apache.kylin.job.impl.threadpool;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.lock.ZookeeperJobLock;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.job.DeployUtil;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.storage.hbase.HBaseMetadataTestCase;
+import org.apache.kylin.storage.hbase.ZookeeperJobLock;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
index 9600ef7..3caa1b0 100644
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
@@ -39,7 +39,7 @@ import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.DictionaryGenerator;
-import org.apache.kylin.dict.lookup.FileTableReader;
+import org.apache.kylin.engine.mr.DFSFileTableReader;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.gridtable.GTRecord;
import org.junit.AfterClass;
@@ -133,7 +133,7 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
distinctSets[i] = new TreeSet<String>();
// get distinct values on each column
- FileTableReader reader = new FileTableReader(flatTable, nColumns);
+ DFSFileTableReader reader = new DFSFileTableReader(flatTable, nColumns);
while (count > 0 && reader.next()) {
String[] row = reader.getRow();
for (int i = 0; i < nColumns; i++)
@@ -183,7 +183,7 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
private static List<byte[]> readValueList(String flatTable, int nColumns, int c) throws IOException {
List<byte[]> result = Lists.newArrayList();
- FileTableReader reader = new FileTableReader(flatTable, nColumns);
+ DFSFileTableReader reader = new DFSFileTableReader(flatTable, nColumns);
while (reader.next()) {
String[] row = reader.getRow();
if (row[c] != null) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
index b246a73..912d218 100644
--- a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
@@ -1,16 +1,17 @@
package org.apache.kylin.job.streaming;
import com.google.common.collect.Lists;
+
import org.apache.hadoop.util.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.job.DeployUtil;
+import org.apache.kylin.storage.hbase.HBaseMetadataTestCase;
import org.apache.kylin.streaming.StreamBuilder;
import org.apache.kylin.streaming.StreamMessage;
import org.junit.Before;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java b/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
index de6df2c..019dc56 100644
--- a/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
+++ b/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
@@ -24,8 +24,8 @@ import java.io.IOException;
import java.util.Set;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.source.hive.HiveSourceTableLoader;
+import org.apache.kylin.storage.hbase.HBaseMetadataTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java b/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
index 624f158..ac2275a 100644
--- a/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
+++ b/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
@@ -18,8 +18,8 @@
package org.apache.kylin.source.hive;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.source.hive.HiveTableReader;
+import org.apache.kylin.storage.hbase.HBaseMetadataTestCase;
import org.junit.Assert;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java b/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
index 975d69f..2b44f9d 100644
--- a/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
+++ b/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
@@ -20,7 +20,6 @@ package org.apache.kylin.source.hive;
import static org.junit.Assert.*;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.dict.lookup.SnapshotManager;
import org.apache.kylin.dict.lookup.SnapshotTable;
import org.apache.kylin.metadata.MetadataManager;
@@ -28,6 +27,7 @@ import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.source.ReadableTable;
import org.apache.kylin.source.ReadableTable.TableReader;
import org.apache.kylin.source.TableSourceFactory;
+import org.apache.kylin.storage.hbase.HBaseMetadataTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/query/pom.xml
----------------------------------------------------------------------
diff --git a/query/pom.xml b/query/pom.xml
index 604f03c..054124e 100644
--- a/query/pom.xml
+++ b/query/pom.xml
@@ -38,13 +38,6 @@
<dependencies>
<dependency>
<groupId>org.apache.kylin</groupId>
- <artifactId>kylin-core-common</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kylin</groupId>
<artifactId>atopcalcite</artifactId>
<version>${project.parent.version}</version>
</dependency>
@@ -77,6 +70,20 @@
<!-- Env & Test -->
<dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-storage-hbase</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
index a595dc9..48578af 100644
--- a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
@@ -20,11 +20,11 @@ package org.apache.kylin.query.test;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.query.enumerator.OLAPQuery;
import org.apache.kylin.query.relnode.OLAPContext;
import org.apache.kylin.query.schema.OLAPSchemaFactory;
+import org.apache.kylin.storage.hbase.HBaseMetadataTestCase;
import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverEnabler;
import org.dbunit.database.DatabaseConnection;
import org.dbunit.database.IDatabaseConnection;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index 0b74879..a757522 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -38,14 +38,6 @@
<dependency>
<groupId>org.apache.kylin</groupId>
- <artifactId>kylin-core-common</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- <version>${project.parent.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.kylin</groupId>
<artifactId>kylin-query</artifactId>
<version>${project.parent.version}</version>
<exclusions>
@@ -70,6 +62,22 @@
</exclusion>
</exclusions>
</dependency>
+
+ <!-- Test & Env -->
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-storage-hbase</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <version>${project.parent.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-job</artifactId>
@@ -77,6 +85,7 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+
<!-- depends on kylin-jdbc just for running jdbc test cases in server module -->
<dependency>
<groupId>org.apache.kylin</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
index 6427a98..5e40e12 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -19,11 +19,11 @@
package org.apache.kylin.rest.controller;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.lock.JobLock;
import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.lock.JobLock;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.exception.InternalErrorException;
import org.apache.kylin.rest.request.JobListRequest;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/server/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java b/server/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java
index 553e157..08b027b 100644
--- a/server/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java
+++ b/server/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java
@@ -2,9 +2,9 @@ package org.apache.kylin.rest.security;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.rest.service.AclService;
import org.apache.kylin.rest.service.UserService;
+import org.apache.kylin.storage.hbase.HBaseConnection;
import org.h2.util.StringUtils;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index f3f81a4..1e55b2c 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -29,11 +29,10 @@ import java.util.Set;
import java.util.WeakHashMap;
import com.google.common.collect.Lists;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HBaseRegionSizeCalculator;
-import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -42,6 +41,7 @@ import org.apache.kylin.cube.cuboid.CuboidCLI;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.BuildEngineFactory;
import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.job.common.HadoopShellExecutable;
import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
@@ -64,6 +64,8 @@ import org.apache.kylin.rest.response.HBaseResponse;
import org.apache.kylin.rest.response.MetricsResponse;
import org.apache.kylin.rest.security.AclPermission;
import org.apache.kylin.source.hive.HiveSourceTableLoader;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.HBaseRegionSizeCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -444,7 +446,7 @@ public class CubeService extends BasicService {
// Get HBase storage conf.
String hbaseUrl = KylinConfig.getInstanceFromEnv().getStorageUrl();
- Configuration hconf = HadoopUtil.newHBaseConfiguration(hbaseUrl);
+ Configuration hconf = HBaseConnection.newHBaseConfiguration(hbaseUrl);
HTable table = null;
HBaseResponse hr = null;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 2a78e21..3e62483 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -55,6 +54,7 @@ import org.apache.kylin.rest.request.SQLRequest;
import org.apache.kylin.rest.response.SQLResponse;
import org.apache.kylin.rest.util.QueryUtil;
import org.apache.kylin.rest.util.Serializer;
+import org.apache.kylin.storage.hbase.HBaseConnection;
import org.h2.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/server/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java b/server/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java
index 08f5217..67cc573 100644
--- a/server/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java
+++ b/server/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java
@@ -22,7 +22,7 @@ package org.apache.kylin.jdbc;
import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.storage.hbase.HBaseMetadataTestCase;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.webapp.WebAppContext;
import org.junit.*;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/source-hive/pom.xml
----------------------------------------------------------------------
diff --git a/source-hive/pom.xml b/source-hive/pom.xml
index fcfd953..17a918e 100644
--- a/source-hive/pom.xml
+++ b/source-hive/pom.xml
@@ -43,6 +43,17 @@
<!-- Env & Test -->
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hive-hcatalog-core</artifactId>
+ <version>${hive-hcatalog.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClient.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClient.java
new file mode 100644
index 0000000..c5a9bca
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClient.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+/**
+ * Hive meta API client for Kylin
+ * @author shaoshi
+ *
+ */
+public class HiveClient {
+
+ protected HiveConf hiveConf = null;
+ protected Driver driver = null;
+ protected HiveMetaStoreClient metaStoreClient = null;
+
+ public HiveClient() {
+ hiveConf = new HiveConf(HiveClient.class);
+ }
+
+ public HiveClient(Map<String, String> configMap) {
+ this();
+ appendConfiguration(configMap);
+ }
+
+ public HiveConf getHiveConf() {
+ return hiveConf;
+ }
+
+ /**
+ * Get the hive ql driver to execute ddl or dml
+ * @return
+ */
+ private Driver getDriver() {
+ if (driver == null) {
+ driver = new Driver(hiveConf);
+ SessionState.start(new CliSessionState(hiveConf));
+ }
+
+ return driver;
+ }
+
+ /**
+ * Append or overwrite the default hive client configuration; You need call this before invoke #executeHQL;
+ * @param configMap
+ */
+ public void appendConfiguration(Map<String, String> configMap) {
+ if (configMap != null && configMap.size() > 0) {
+ for (Entry<String, String> e : configMap.entrySet()) {
+ hiveConf.set(e.getKey(), e.getValue());
+ }
+ }
+ }
+
+ /**
+ *
+ * @param hql
+ * @throws CommandNeedRetryException
+ * @throws IOException
+ */
+ public void executeHQL(String hql) throws CommandNeedRetryException, IOException {
+ CommandProcessorResponse response = getDriver().run(hql);
+ int retCode = response.getResponseCode();
+ if (retCode != 0) {
+ String err = response.getErrorMessage();
+ throw new IOException("Failed to execute hql [" + hql + "], error message is: " + err);
+ }
+ }
+
+ public void executeHQL(String[] hqls) throws CommandNeedRetryException, IOException {
+ for (String sql : hqls)
+ executeHQL(sql);
+ }
+
+ private HiveMetaStoreClient getMetaStoreClient() throws Exception {
+ if (metaStoreClient == null) {
+ metaStoreClient = new HiveMetaStoreClient(hiveConf);
+ }
+ return metaStoreClient;
+ }
+
+ public Table getHiveTable(String database, String tableName) throws Exception {
+ return getMetaStoreClient().getTable(database, tableName);
+ }
+
+ public List<FieldSchema> getHiveTableFields(String database, String tableName) throws Exception {
+ return getMetaStoreClient().getFields(database, tableName);
+ }
+
+ public String getHiveTableLocation(String database, String tableName) throws Exception {
+ Table t = getHiveTable(database, tableName);
+ return t.getSd().getLocation();
+ }
+
+ public long getFileSizeForTable(Table table) {
+ return getBasicStatForTable(new org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.TOTAL_SIZE);
+ }
+
+ public long getFileNumberForTable(Table table) {
+ return getBasicStatForTable(new org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.NUM_FILES);
+ }
+
+ /**
+ * COPIED FROM org.apache.hadoop.hive.ql.stats.StatsUtil for backward compatibility
+ *
+ * Get basic stats of table
+ * @param table
+ * - table
+ * @param statType
+ * - type of stats
+ * @return value of stats
+ */
+ public static long getBasicStatForTable(org.apache.hadoop.hive.ql.metadata.Table table, String statType) {
+ Map<String, String> params = table.getParameters();
+ long result = 0;
+
+ if (params != null) {
+ try {
+ result = Long.parseLong(params.get(statType));
+ } catch (NumberFormatException e) {
+ result = 0;
+ }
+ }
+ return result;
+ }
+
+ public boolean isNativeTable(String database, String tableName) throws Exception{
+ return !MetaStoreUtils.isNonNativeTable(getMetaStoreClient().getTable(database, tableName));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/storage-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/storage-hbase/pom.xml b/storage-hbase/pom.xml
index 951602e..013b009 100644
--- a/storage-hbase/pom.xml
+++ b/storage-hbase/pom.xml
@@ -43,6 +43,44 @@
<!-- Env & Test -->
<dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-testing-util</artifactId>
+ <version>${hbase-hadoop2.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
new file mode 100644
index 0000000..d1bb216
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.kylin.common.persistence.StorageException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangli9
+ *
+ */
+public class HBaseConnection {
+
+ private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class);
+
+ private static final Map<String, Configuration> ConfigCache = new ConcurrentHashMap<String, Configuration>();
+ private static final Map<String, HConnection> ConnPool = new ConcurrentHashMap<String, HConnection>();
+
+ static {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ for (HConnection conn : ConnPool.values()) {
+ try {
+ conn.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * e.g.
+ * 0. hbase (recommended way)
+ * 1. hbase:zk-1.hortonworks.com,zk-2.hortonworks.com,zk-3.hortonworks.com:2181:/hbase-unsecure
+ * 2. hbase:zk-1.hortonworks.com,zk-2.hortonworks.com,zk-3.hortonworks.com:2181
+ * 3. hbase:zk-1.hortonworks.com:2181:/hbase-unsecure
+ * 4. hbase:zk-1.hortonworks.com:2181
+ */
+ public static Configuration newHBaseConfiguration(String url) {
+ Configuration conf = HBaseConfiguration.create();
+ // reduce rpc retry
+ conf.set(HConstants.HBASE_CLIENT_PAUSE, "3000");
+ conf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "5");
+ conf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "60000");
+ // conf.set(ScannerCallable.LOG_SCANNER_ACTIVITY, "true");
+ if (StringUtils.isEmpty(url)) {
+ return conf;
+ }
+
+ // chop off "hbase"
+ if (url.startsWith("hbase") == false) {
+ throw new IllegalArgumentException("hbase url must start with 'hbase' -- " + url);
+ }
+
+ url = StringUtils.substringAfter(url, "hbase");
+ if (StringUtils.isEmpty(url)) {
+ return conf;
+ }
+
+ // case of "hbase:domain.com:2181:/hbase-unsecure"
+ Pattern urlPattern = Pattern.compile("[:]((?:[\\w\\-.]+)(?:\\,[\\w\\-.]+)*)[:](\\d+)(?:[:](.+))");
+ Matcher m = urlPattern.matcher(url);
+ if (m.matches() == false)
+ throw new IllegalArgumentException("HBase URL '" + url + "' is invalid, expected url is like '" + "hbase:domain.com:2181:/hbase-unsecure" + "'");
+
+ logger.debug("Creating hbase conf by parsing -- " + url);
+
+ String quorums = m.group(1);
+ String quorum = null;
+ try {
+ String[] tokens = quorums.split(",");
+ for (String s : tokens) {
+ quorum = s;
+ InetAddress.getByName(quorum);
+ }
+ } catch (UnknownHostException e) {
+ throw new IllegalArgumentException("Zookeeper quorum is invalid: " + quorum + "; urlString=" + url, e);
+ }
+ conf.set(HConstants.ZOOKEEPER_QUORUM, quorums);
+
+ String port = m.group(2);
+ conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, port);
+
+ String znodePath = m.group(3);
+ conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodePath);
+
+ return conf;
+ }
+
+ // returned HConnection can be shared by multiple threads and does not require close()
+ @SuppressWarnings("resource")
+ public static HConnection get(String url) {
+ // find configuration
+ Configuration conf = ConfigCache.get(url);
+ if (conf == null) {
+ conf = newHBaseConfiguration(url);
+ ConfigCache.put(url, conf);
+ }
+
+ HConnection connection = ConnPool.get(url);
+ try {
+ while (true) {
+ // I don't use DCL since recreate a connection is not a big issue.
+ if (connection == null || connection.isClosed()) {
+ logger.info("connection is null or closed, creating a new one");
+ connection = HConnectionManager.createConnection(conf);
+ ConnPool.put(url, connection);
+ }
+
+ if (connection == null || connection.isClosed()) {
+ Thread.sleep(10000);// wait a while and retry
+ } else {
+ break;
+ }
+ }
+
+ } catch (Throwable t) {
+ logger.error("Error when open connection " + url, t);
+ throw new StorageException("Error when open connection " + url, t);
+ }
+
+ return connection;
+ }
+
+ public static boolean tableExists(HConnection conn, String tableName) throws IOException {
+ HBaseAdmin hbase = new HBaseAdmin(conn);
+ try {
+ return hbase.tableExists(TableName.valueOf(tableName));
+ } finally {
+ hbase.close();
+ }
+ }
+
+ public static boolean tableExists(String hbaseUrl, String tableName) throws IOException {
+ return tableExists(HBaseConnection.get(hbaseUrl), tableName);
+ }
+
+ public static void createHTableIfNeeded(String hbaseUrl, String tableName, String... families) throws IOException {
+ createHTableIfNeeded(HBaseConnection.get(hbaseUrl), tableName, families);
+ }
+
+ public static void deleteTable(String hbaseUrl, String tableName) throws IOException {
+ deleteTable(HBaseConnection.get(hbaseUrl), tableName);
+ }
+
+ public static void createHTableIfNeeded(HConnection conn, String tableName, String... families) throws IOException {
+ HBaseAdmin hbase = new HBaseAdmin(conn);
+
+ try {
+ if (tableExists(conn, tableName)) {
+ logger.debug("HTable '" + tableName + "' already exists");
+ return;
+ }
+
+ logger.debug("Creating HTable '" + tableName + "'");
+
+ HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+
+ if (null != families && families.length > 0) {
+ for (String family : families) {
+ HColumnDescriptor fd = new HColumnDescriptor(family);
+ fd.setInMemory(true); // metadata tables are best in memory
+ desc.addFamily(fd);
+ }
+ }
+ hbase.createTable(desc);
+
+ logger.debug("HTable '" + tableName + "' created");
+ } finally {
+ hbase.close();
+ }
+ }
+
+ public static void deleteTable(HConnection conn, String tableName) throws IOException {
+ HBaseAdmin hbase = new HBaseAdmin(conn);
+
+ try {
+ if (!tableExists(conn, tableName)) {
+ logger.debug("HTable '" + tableName + "' does not exists");
+ return;
+ }
+
+ logger.debug("delete HTable '" + tableName + "'");
+
+ if (hbase.isTableEnabled(tableName)) {
+ hbase.disableTable(tableName);
+ }
+ hbase.deleteTable(tableName);
+
+ logger.debug("HTable '" + tableName + "' deleted");
+ } finally {
+ hbase.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseRegionSizeCalculator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseRegionSizeCalculator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseRegionSizeCalculator.java
new file mode 100644
index 0000000..bd41a99
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseRegionSizeCalculator.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+/** This class will come with HBase 2.0 in package org.apache.hadoop.hbase.util **/
+package org.apache.kylin.storage.hbase;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.RegionLoad;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HBaseRegionSizeCalculator {
+
+ private static final Logger logger = LoggerFactory.getLogger(HBaseRegionSizeCalculator.class);
+
+ /**
+ * Maps each region to its size in bytes.
+ **/
+ private final Map<byte[], Long> sizeMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+
+ static final String ENABLE_REGIONSIZECALCULATOR = "hbase.regionsizecalculator.enable";
+
+ /**
+ * Computes size of each region for table and given column families.
+ * */
+ public HBaseRegionSizeCalculator(HTable table) throws IOException {
+ this(table, new HBaseAdmin(table.getConfiguration()));
+ }
+
+ /** Constructor for unit testing */
+ HBaseRegionSizeCalculator(HTable table, HBaseAdmin hBaseAdmin) throws IOException {
+
+ try {
+ if (!enabled(table.getConfiguration())) {
+ logger.info("Region size calculation disabled.");
+ return;
+ }
+
+ logger.info("Calculating region sizes for table \"" + new String(table.getTableName()) + "\".");
+
+ // Get regions for table.
+ Set<HRegionInfo> tableRegionInfos = table.getRegionLocations().keySet();
+ Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+
+ for (HRegionInfo regionInfo : tableRegionInfos) {
+ tableRegions.add(regionInfo.getRegionName());
+ }
+
+ ClusterStatus clusterStatus = hBaseAdmin.getClusterStatus();
+ Collection<ServerName> servers = clusterStatus.getServers();
+ final long megaByte = 1024L * 1024L;
+
+ // Iterate all cluster regions, filter regions from our table and
+ // compute their size.
+ for (ServerName serverName : servers) {
+ ServerLoad serverLoad = clusterStatus.getLoad(serverName);
+
+ for (RegionLoad regionLoad : serverLoad.getRegionsLoad().values()) {
+ byte[] regionId = regionLoad.getName();
+
+ if (tableRegions.contains(regionId)) {
+
+ long regionSizeBytes = regionLoad.getStorefileSizeMB() * megaByte;
+ sizeMap.put(regionId, regionSizeBytes);
+
+ // logger.info("Region " + regionLoad.getNameAsString()
+ // + " has size " + regionSizeBytes);
+ }
+ }
+ }
+ } finally {
+ hBaseAdmin.close();
+ }
+
+ }
+
+ boolean enabled(Configuration configuration) {
+ return configuration.getBoolean(ENABLE_REGIONSIZECALCULATOR, true);
+ }
+
+ /**
+ * Returns size of given region in bytes. Returns 0 if region was not found.
+ **/
+ public long getRegionSize(byte[] regionId) {
+ Long size = sizeMap.get(regionId);
+ if (size == null) {
+ logger.info("Unknown region:" + Arrays.toString(regionId));
+ return 0;
+ } else {
+ return size;
+ }
+ }
+
+ public Map<byte[], Long> getRegionSizeMap() {
+ return Collections.unmodifiableMap(sizeMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
new file mode 100644
index 0000000..0e897c8
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import com.google.common.collect.Lists;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.engine.mr.HadoopUtil;
+
+import java.io.*;
+import java.util.*;
+
+public class HBaseResourceStore extends ResourceStore {
+
+ private static final String DEFAULT_TABLE_NAME = "kylin_metadata";
+ private static final String FAMILY = "f";
+ private static final byte[] B_FAMILY = Bytes.toBytes(FAMILY);
+ private static final String COLUMN = "c";
+ private static final byte[] B_COLUMN = Bytes.toBytes(COLUMN);
+ private static final String COLUMN_TS = "t";
+ private static final byte[] B_COLUMN_TS = Bytes.toBytes(COLUMN_TS);
+
+ private static final Map<String, String> TABLE_SUFFIX_MAP = new LinkedHashMap<String, String>();
+
+ static {
+ TABLE_SUFFIX_MAP.put(CUBE_RESOURCE_ROOT + "/", "_cube");
+ TABLE_SUFFIX_MAP.put(DICT_RESOURCE_ROOT + "/", "_dict");
+ TABLE_SUFFIX_MAP.put("/invertedindex/", "_invertedindex");
+ TABLE_SUFFIX_MAP.put(JOB_PATH_ROOT + "/", "_job");
+ TABLE_SUFFIX_MAP.put(JOB_OUTPUT_PATH_ROOT + "/", "_job_output");
+ TABLE_SUFFIX_MAP.put(PROJECT_RESOURCE_ROOT + "/", "_proj");
+ TABLE_SUFFIX_MAP.put(SNAPSHOT_RESOURCE_ROOT + "/", "_table_snapshot");
+ TABLE_SUFFIX_MAP.put("", ""); // DEFAULT CASE
+ }
+
+ final String tableNameBase;
+ final String hbaseUrl;
+
+ // final Map<String, String> tableNameMap; // path prefix ==> HBase table name
+
+ private HConnection getConnection() throws IOException {
+ return HBaseConnection.get(hbaseUrl);
+ }
+
+ public HBaseResourceStore(KylinConfig kylinConfig) throws IOException {
+ super(kylinConfig);
+
+ String metadataUrl = kylinConfig.getMetadataUrl();
+ // split TABLE@HBASE_URL
+ int cut = metadataUrl.indexOf('@');
+ tableNameBase = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut);
+ hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1);
+
+ createHTableIfNeeded(getAllInOneTableName());
+
+ // tableNameMap = new LinkedHashMap<String, String>();
+ // for (Entry<String, String> entry : TABLE_SUFFIX_MAP.entrySet()) {
+ // String pathPrefix = entry.getKey();
+ // String tableName = tableNameBase + entry.getValue();
+ // tableNameMap.put(pathPrefix, tableName);
+ // createHTableIfNeeded(tableName);
+ // }
+
+ }
+
+ private void createHTableIfNeeded(String tableName) throws IOException {
+ HBaseConnection.createHTableIfNeeded(getConnection(), tableName, FAMILY);
+ }
+
+ private String getAllInOneTableName() {
+ return tableNameBase;
+ }
+
+ @Override
+ protected ArrayList<String> listResourcesImpl(String resPath) throws IOException {
+ assert resPath.startsWith("/");
+ String lookForPrefix = resPath.endsWith("/") ? resPath : resPath + "/";
+ byte[] startRow = Bytes.toBytes(lookForPrefix);
+ byte[] endRow = Bytes.toBytes(lookForPrefix);
+ endRow[endRow.length - 1]++;
+
+ ArrayList<String> result = new ArrayList<String>();
+
+ HTableInterface table = getConnection().getTable(getAllInOneTableName());
+ Scan scan = new Scan(startRow, endRow);
+ scan.setFilter(new KeyOnlyFilter());
+ try {
+ ResultScanner scanner = table.getScanner(scan);
+ for (Result r : scanner) {
+ String path = Bytes.toString(r.getRow());
+ assert path.startsWith(lookForPrefix);
+ int cut = path.indexOf('/', lookForPrefix.length());
+ String child = cut < 0 ? path : path.substring(0, cut);
+ if (result.contains(child) == false)
+ result.add(child);
+ }
+ } finally {
+ IOUtils.closeQuietly(table);
+ }
+ // return null to indicate not a folder
+ return result.isEmpty() ? null : result;
+ }
+
+ @Override
+ protected boolean existsImpl(String resPath) throws IOException {
+ Result r = getByScan(resPath, null, null);
+ return r != null;
+ }
+
+ @Override
+ protected List<RawResource> getAllResources(String rangeStart, String rangeEnd) throws IOException {
+ byte[] startRow = Bytes.toBytes(rangeStart);
+ byte[] endRow = plusZero(Bytes.toBytes(rangeEnd));
+
+ Scan scan = new Scan(startRow, endRow);
+ scan.addColumn(B_FAMILY, B_COLUMN_TS);
+ scan.addColumn(B_FAMILY, B_COLUMN);
+
+ HTableInterface table = getConnection().getTable(getAllInOneTableName());
+ List<RawResource> result = Lists.newArrayList();
+ try {
+ ResultScanner scanner = table.getScanner(scan);
+ for (Result r : scanner) {
+ result.add(new RawResource(getInputStream(Bytes.toString(r.getRow()), r), getTimestamp(r)));
+ }
+ } catch (IOException e) {
+ for (RawResource rawResource : result) {
+ IOUtils.closeQuietly(rawResource.resource);
+ }
+ throw e;
+ } finally {
+ IOUtils.closeQuietly(table);
+ }
+ return result;
+ }
+
+ private InputStream getInputStream(String resPath, Result r) throws IOException {
+ if (r == null) {
+ return null;
+ }
+ byte[] value = r.getValue(B_FAMILY, B_COLUMN);
+ if (value.length == 0) {
+ Path redirectPath = bigCellHDFSPath(resPath);
+ Configuration hconf = HadoopUtil.getCurrentConfiguration();
+ FileSystem fileSystem = FileSystem.get(hconf);
+
+ return fileSystem.open(redirectPath);
+ } else {
+ return new ByteArrayInputStream(value);
+ }
+ }
+
+ private long getTimestamp(Result r) {
+ if (r == null) {
+ return 0;
+ } else {
+ return Bytes.toLong(r.getValue(B_FAMILY, B_COLUMN_TS));
+ }
+ }
+
+ @Override
+ protected InputStream getResourceImpl(String resPath) throws IOException {
+ Result r = getByScan(resPath, B_FAMILY, B_COLUMN);
+ return getInputStream(resPath, r);
+ }
+
+ @Override
+ protected long getResourceTimestampImpl(String resPath) throws IOException {
+ Result r = getByScan(resPath, B_FAMILY, B_COLUMN_TS);
+ return getTimestamp(r);
+ }
+
+ @Override
+ protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ IOUtils.copy(content, bout);
+ bout.close();
+
+ HTableInterface table = getConnection().getTable(getAllInOneTableName());
+ try {
+ byte[] row = Bytes.toBytes(resPath);
+ Put put = buildPut(resPath, ts, row, bout.toByteArray(), table);
+
+ table.put(put);
+ table.flushCommits();
+ } finally {
+ IOUtils.closeQuietly(table);
+ }
+ }
+
+ @Override
+ protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException {
+ HTableInterface table = getConnection().getTable(getAllInOneTableName());
+ try {
+ byte[] row = Bytes.toBytes(resPath);
+ byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS);
+ Put put = buildPut(resPath, newTS, row, content, table);
+
+ boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put);
+ if (!ok) {
+ long real = getResourceTimestamp(resPath);
+ throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real);
+ }
+
+ table.flushCommits();
+
+ return newTS;
+ } finally {
+ IOUtils.closeQuietly(table);
+ }
+ }
+
+ @Override
+ protected void deleteResourceImpl(String resPath) throws IOException {
+ HTableInterface table = getConnection().getTable(getAllInOneTableName());
+ try {
+ Delete del = new Delete(Bytes.toBytes(resPath));
+ table.delete(del);
+ table.flushCommits();
+ } finally {
+ IOUtils.closeQuietly(table);
+ }
+ }
+
+ @Override
+ protected String getReadableResourcePathImpl(String resPath) {
+ return getAllInOneTableName() + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl();
+ }
+
+ private Result getByScan(String path, byte[] family, byte[] column) throws IOException {
+ byte[] startRow = Bytes.toBytes(path);
+ byte[] endRow = plusZero(startRow);
+
+ Scan scan = new Scan(startRow, endRow);
+ if (family == null || column == null) {
+ scan.setFilter(new KeyOnlyFilter());
+ } else {
+ scan.addColumn(family, column);
+ }
+
+ HTableInterface table = getConnection().getTable(getAllInOneTableName());
+ try {
+ ResultScanner scanner = table.getScanner(scan);
+ Result result = null;
+ for (Result r : scanner) {
+ result = r;
+ }
+ return result == null || result.isEmpty() ? null : result;
+ } finally {
+ IOUtils.closeQuietly(table);
+ }
+ }
+
+ private byte[] plusZero(byte[] startRow) {
+ byte[] endRow = Arrays.copyOf(startRow, startRow.length + 1);
+ endRow[endRow.length - 1] = 0;
+ return endRow;
+ }
+
+ private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException {
+ Path redirectPath = bigCellHDFSPath(resPath);
+ Configuration hconf = HadoopUtil.getCurrentConfiguration();
+ FileSystem fileSystem = FileSystem.get(hconf);
+
+ if (fileSystem.exists(redirectPath)) {
+ fileSystem.delete(redirectPath, true);
+ }
+
+ FSDataOutputStream out = fileSystem.create(redirectPath);
+
+ try {
+ out.write(largeColumn);
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
+
+ return redirectPath;
+ }
+
+ public Path bigCellHDFSPath(String resPath) {
+ String hdfsWorkingDirectory = this.kylinConfig.getHdfsWorkingDirectory();
+ Path redirectPath = new Path(hdfsWorkingDirectory, "resources" + resPath);
+ return redirectPath;
+ }
+
+ private Put buildPut(String resPath, long ts, byte[] row, byte[] content, HTableInterface table) throws IOException {
+ int kvSizeLimit = this.kylinConfig.getHBaseKeyValueSize();
+ if (content.length > kvSizeLimit) {
+ writeLargeCellToHdfs(resPath, content, table);
+ content = BytesUtil.EMPTY_BYTE_ARRAY;
+ }
+
+ Put put = new Put(row);
+ put.add(B_FAMILY, B_COLUMN, content);
+ put.add(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts));
+
+ return put;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ZookeeperJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ZookeeperJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ZookeeperJobLock.java
new file mode 100644
index 0000000..5b13eb9
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ZookeeperJobLock.java
@@ -0,0 +1,83 @@
+package org.apache.kylin.storage.hbase;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.lock.JobLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class ZookeeperJobLock implements JobLock {
+ private Logger logger = LoggerFactory.getLogger(ZookeeperJobLock.class);
+
+ private static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
+
+ private String scheduleID;
+ private InterProcessMutex sharedLock;
+ private CuratorFramework zkClient;
+
+ @Override
+ public boolean lock() {
+ this.scheduleID = schedulerId();
+ String ZKConnectString = getZKConnectString();
+ if (StringUtils.isEmpty(ZKConnectString)) {
+ throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
+ }
+
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ this.zkClient = CuratorFrameworkFactory.newClient(ZKConnectString, retryPolicy);
+ this.zkClient.start();
+ this.sharedLock = new InterProcessMutex(zkClient, this.scheduleID);
+ boolean hasLock = false;
+ try {
+ hasLock = sharedLock.acquire(3, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ logger.warn("error acquire lock", e);
+ }
+ if (!hasLock) {
+ logger.warn("fail to acquire lock, scheduler has not been started");
+ zkClient.close();
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void unlock() {
+ releaseLock();
+ }
+
+ private String getZKConnectString() {
+ Configuration conf = HBaseConnection.newHBaseConfiguration(KylinConfig.getInstanceFromEnv().getStorageUrl());
+ return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
+ }
+
+ private void releaseLock() {
+ try {
+ if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) {
+ // client.setData().forPath(ZOOKEEPER_LOCK_PATH, null);
+ if (zkClient.checkExists().forPath(scheduleID) != null) {
+ zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(scheduleID);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("error release lock:" + scheduleID);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String schedulerId() {
+ return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d2456215/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/HBaseMetadataTestCase.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/HBaseMetadataTestCase.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/HBaseMetadataTestCase.java
new file mode 100644
index 0000000..0926604
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/HBaseMetadataTestCase.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+
+import java.io.File;
+
+/**
+ * @author ysong1
+ */
+public class HBaseMetadataTestCase extends AbstractKylinTestCase {
+
+ static {
+ if (useSandbox()) {
+ try {
+ ClassUtil.addClasspath(new File("../examples/test_case_data/sandbox/").getAbsolutePath());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ public void createTestMetadata() throws Exception {
+ staticCreateTestMetadata();
+ }
+
+ @Override
+ public void cleanupTestMetadata() {
+ staticCleanupTestMetadata();
+ }
+
+ public static void staticCreateTestMetadata() throws Exception {
+ if (useSandbox()) {
+ staticCreateTestMetadata(SANDBOX_TEST_DATA);
+ } else {
+ staticCreateTestMetadata(MINICLUSTER_TEST_DATA);
+ HBaseMiniclusterHelper.startupMinicluster();
+ }
+
+ }
+ public static void staticCreateTestMetadata(String kylinConfigFolder) {
+
+ KylinConfig.destoryInstance();
+
+ if (System.getProperty(KylinConfig.KYLIN_CONF) == null && System.getenv(KylinConfig.KYLIN_CONF) == null)
+ System.setProperty(KylinConfig.KYLIN_CONF, kylinConfigFolder);
+
+ }
+
+ public static boolean useSandbox() {
+ String useSandbox = System.getProperty("useSandbox");
+ return Boolean.parseBoolean(useSandbox);
+ }
+
+}