You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/24 01:20:12 UTC
[04/28] incubator-kylin git commit: KYLIN-875 Split job module into
'core-job', 'engine-mr', 'source-hive',
'storage-hbase'. The old job remains as an assembly project.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
new file mode 100644
index 0000000..ed67109
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
@@ -0,0 +1,85 @@
+package org.apache.kylin.storage.hbase.steps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI;
+import org.apache.kylin.storage.hbase.util.LZOSupportnessChecker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ */
+public class CubeHTableUtil {
+
+ private static final Logger logger = LoggerFactory.getLogger(CubeHTableUtil.class);
+
+ public static void createHTable(CubeDesc cubeDesc, String tableName, byte[][] splitKeys) throws IOException {
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+
+ HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
+ // https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.html
+ tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
+ tableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
+
+ Configuration conf = HBaseConfiguration.create();
+ HBaseAdmin admin = new HBaseAdmin(conf);
+
+ try {
+ if (User.isHBaseSecurityEnabled(conf)) {
+ // add coprocessor for bulk load
+ tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
+ }
+
+ for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
+ HColumnDescriptor cf = new HColumnDescriptor(cfDesc.getName());
+ cf.setMaxVersions(1);
+
+ if (LZOSupportnessChecker.getSupportness()) {
+ logger.info("hbase will use lzo to compress cube data");
+ cf.setCompressionType(Compression.Algorithm.LZO);
+ } else {
+ logger.info("hbase will not use lzo to compress cube data");
+ }
+
+ cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
+ cf.setInMemory(false);
+ cf.setBlocksize(4 * 1024 * 1024); // set to 4MB
+ tableDesc.addFamily(cf);
+ }
+
+ if (admin.tableExists(tableName)) {
+ // admin.disableTable(tableName);
+ // admin.deleteTable(tableName);
+ throw new RuntimeException("HBase table " + tableName + " exists!");
+ }
+
+ DeployCoprocessorCLI.deployCoprocessor(tableDesc);
+
+ admin.createTable(tableDesc, splitKeys);
+ Preconditions.checkArgument(admin.isTableAvailable(tableName), "table " + tableName + " created, but is not available due to some reasons");
+ logger.info("create hbase table " + tableName + " done.");
+ } catch (Exception e) {
+ logger.error("Failed to create HTable", e);
+ throw e;
+ } finally {
+ admin.close();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
new file mode 100644
index 0000000..e42f709
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -0,0 +1,135 @@
+package org.apache.kylin.storage.hbase.steps;
+
+import java.util.List;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.RangeKeyDistributionJob;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class HBaseMRSteps extends JobBuilderSupport {
+
+ public HBaseMRSteps(CubeSegment seg) {
+ super(seg, null);
+ }
+
+ public void addSaveCuboidToHTableSteps(DefaultChainedExecutable jobFlow, String cuboidRootPath) {
+ String jobId = jobFlow.getId();
+
+ // calculate key distribution
+ jobFlow.addTask(createRangeRowkeyDistributionStep(cuboidRootPath + "*", jobId));
+ // create htable step
+ jobFlow.addTask(createCreateHTableStep(jobId));
+ // generate hfiles step
+ jobFlow.addTask(createConvertCuboidToHfileStep(cuboidRootPath + "*", jobId));
+ // bulk load step
+ jobFlow.addTask(createBulkLoadStep(jobId));
+ }
+
+ public MapReduceExecutable createRangeRowkeyDistributionStep(String inputPath, String jobId) {
+ MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable();
+ rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION);
+ StringBuilder cmd = new StringBuilder();
+
+ appendMapReduceParameters(cmd, seg);
+ appendExecCmdParameters(cmd, "input", inputPath);
+ appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath(jobId));
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Region_Splits_Calculator_" + seg.getCubeInstance().getName() + "_Step");
+
+ rowkeyDistributionStep.setMapReduceParams(cmd.toString());
+ rowkeyDistributionStep.setMapReduceJobClass(RangeKeyDistributionJob.class);
+ return rowkeyDistributionStep;
+ }
+
+ public HadoopShellExecutable createCreateHTableStep(String jobId) {
+ return createCreateHTableStep(jobId, false);
+ }
+
+ public HadoopShellExecutable createCreateHTableStepWithStats(String jobId) {
+ return createCreateHTableStep(jobId, true);
+ }
+
+ private HadoopShellExecutable createCreateHTableStep(String jobId, boolean withStats) {
+ HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
+ createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
+ StringBuilder cmd = new StringBuilder();
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
+ appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+ appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(withStats));
+
+ createHtableStep.setJobParams(cmd.toString());
+ createHtableStep.setJobClass(CreateHTableJob.class);
+
+ return createHtableStep;
+ }
+
+ public MapReduceExecutable createConvertCuboidToHfileStep(String inputPath, String jobId) {
+ MapReduceExecutable createHFilesStep = new MapReduceExecutable();
+ createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
+ StringBuilder cmd = new StringBuilder();
+
+ appendMapReduceParameters(cmd, seg);
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "input", inputPath);
+ appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
+ appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+ appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getCubeInstance().getName() + "_Step");
+
+ createHFilesStep.setMapReduceParams(cmd.toString());
+ createHFilesStep.setMapReduceJobClass(CubeHFileJob.class);
+ createHFilesStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+
+ return createHFilesStep;
+ }
+
+ public HadoopShellExecutable createBulkLoadStep(String jobId) {
+ HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
+ bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
+
+ StringBuilder cmd = new StringBuilder();
+ appendExecCmdParameters(cmd, "input", getHFilePath(jobId));
+ appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+
+ bulkLoadStep.setJobParams(cmd.toString());
+ bulkLoadStep.setJobClass(BulkLoadJob.class);
+
+ return bulkLoadStep;
+ }
+
+ public MergeGCStep createMergeGCStep() {
+ MergeGCStep result = new MergeGCStep();
+ result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+ result.setOldHTables(getMergingHTables());
+ return result;
+ }
+
+ public List<String> getMergingHTables() {
+ final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+ Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
+ final List<String> mergingHTables = Lists.newArrayList();
+ for (CubeSegment merging : mergingSegments) {
+ mergingHTables.add(merging.getStorageLocationIdentifier());
+ }
+ return mergingHTables;
+ }
+
+ public String getHFilePath(String jobId) {
+ return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/";
+ }
+
+ public String getRowkeyDistributionOutputPath(String jobId) {
+ return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/InMemKeyValueCreator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/InMemKeyValueCreator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/InMemKeyValueCreator.java
new file mode 100644
index 0000000..b2d691e
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/InMemKeyValueCreator.java
@@ -0,0 +1,73 @@
+package org.apache.kylin.storage.hbase.steps;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+import com.google.common.collect.Lists;
+
+public class InMemKeyValueCreator {
+ byte[] cfBytes;
+ byte[] qBytes;
+ long timestamp;
+
+
+ MeasureCodec codec;
+ Object[] colValues;
+ ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+
+ int startPosition = 0;
+
+ public InMemKeyValueCreator(HBaseColumnDesc colDesc, int startPosition) {
+
+ cfBytes = Bytes.toBytes(colDesc.getColumnFamilyName());
+ qBytes = Bytes.toBytes(colDesc.getQualifier());
+ timestamp = System.currentTimeMillis();
+
+ List<MeasureDesc> measures = Lists.newArrayList();
+ for (MeasureDesc measure : colDesc.getMeasures()) {
+ measures.add(measure);
+ }
+ codec = new MeasureCodec(measures);
+ colValues = new Object[measures.size()];
+
+ this.startPosition = startPosition;
+
+ }
+
+ public KeyValue create(Text key, Object[] measureValues) {
+ return create(key.getBytes(), 0, key.getLength(), measureValues);
+ }
+
+ public KeyValue create(byte[] keyBytes, int keyOffset, int keyLength, Object[] measureValues) {
+ for (int i = 0; i < colValues.length; i++) {
+ colValues[i] = measureValues[startPosition + i];
+ }
+
+ valueBuf.clear();
+ codec.encode(colValues, valueBuf);
+
+ return create(keyBytes, keyOffset, keyLength, valueBuf.array(), 0, valueBuf.position());
+ }
+
+
+ public KeyValue create(byte[] keyBytes, int keyOffset, int keyLength, byte[] value, int voffset, int vlen) {
+ return new KeyValue(keyBytes, keyOffset, keyLength, //
+ cfBytes, 0, cfBytes.length, //
+ qBytes, 0, qBytes.length, //
+ timestamp, KeyValue.Type.Put, //
+ value, voffset, vlen);
+ }
+
+ public KeyValue create(Text key, byte[] value, int voffset, int vlen) {
+ return create(key.getBytes(), 0, key.getLength(), value, voffset, vlen);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
new file mode 100644
index 0000000..3e49e27
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
@@ -0,0 +1,104 @@
+package org.apache.kylin.storage.hbase.steps;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * @author George Song (ysong1)
+ */
+public class KeyValueCreator {
+ byte[] cfBytes;
+ byte[] qBytes;
+ long timestamp;
+
+ int[] refIndex;
+ MeasureDesc[] refMeasures;
+
+ MeasureCodec codec;
+ Object[] colValues;
+ ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+
+ public boolean isFullCopy;
+
+ public KeyValueCreator(CubeDesc cubeDesc, HBaseColumnDesc colDesc) {
+
+ cfBytes = Bytes.toBytes(colDesc.getColumnFamilyName());
+ qBytes = Bytes.toBytes(colDesc.getQualifier());
+ timestamp = System.currentTimeMillis();
+
+ List<MeasureDesc> measures = cubeDesc.getMeasures();
+ String[] measureNames = getMeasureNames(cubeDesc);
+ String[] refs = colDesc.getMeasureRefs();
+
+ refIndex = new int[refs.length];
+ refMeasures = new MeasureDesc[refs.length];
+ for (int i = 0; i < refs.length; i++) {
+ refIndex[i] = indexOf(measureNames, refs[i]);
+ refMeasures[i] = measures.get(refIndex[i]);
+ }
+
+ codec = new MeasureCodec(refMeasures);
+ colValues = new Object[refs.length];
+
+ isFullCopy = true;
+ for (int i = 0; i < measures.size(); i++) {
+ if (refIndex.length <= i || refIndex[i] != i)
+ isFullCopy = false;
+ }
+ }
+
+ public KeyValue create(Text key, Object[] measureValues) {
+ return create(key.getBytes(), 0, key.getLength(), measureValues);
+ }
+
+ public KeyValue create(byte[] keyBytes, int keyOffset, int keyLength, Object[] measureValues) {
+ for (int i = 0; i < colValues.length; i++) {
+ colValues[i] = measureValues[refIndex[i]];
+ }
+
+ valueBuf.clear();
+ codec.encode(colValues, valueBuf);
+
+ return create(keyBytes, keyOffset, keyLength, valueBuf.array(), 0, valueBuf.position());
+ }
+
+
+ public KeyValue create(byte[] keyBytes, int keyOffset, int keyLength, byte[] value, int voffset, int vlen) {
+ return new KeyValue(keyBytes, keyOffset, keyLength, //
+ cfBytes, 0, cfBytes.length, //
+ qBytes, 0, qBytes.length, //
+ timestamp, KeyValue.Type.Put, //
+ value, voffset, vlen);
+ }
+
+ public KeyValue create(Text key, byte[] value, int voffset, int vlen) {
+ return create(key.getBytes(), 0, key.getLength(), value, voffset, vlen);
+ }
+
+ private int indexOf(String[] measureNames, String ref) {
+ for (int i = 0; i < measureNames.length; i++)
+ if (measureNames[i].equalsIgnoreCase(ref))
+ return i;
+
+ throw new IllegalArgumentException("Measure '" + ref + "' not found in " + Arrays.toString(measureNames));
+ }
+
+ private String[] getMeasureNames(CubeDesc cubeDesc) {
+ List<MeasureDesc> measures = cubeDesc.getMeasures();
+ String[] result = new String[measures.size()];
+ for (int i = 0; i < measures.size(); i++)
+ result[i] = measures.get(i).getName();
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
new file mode 100644
index 0000000..df42560
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Drop HBase tables that is no longer needed
+ */
+public class MergeGCStep extends AbstractExecutable {
+
+ private static final String OLD_HTABLES = "oldHTables";
+
+ private static final Logger logger = LoggerFactory.getLogger(MergeGCStep.class);
+
+ public MergeGCStep() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+
+ StringBuffer output = new StringBuffer();
+
+ List<String> oldTables = getOldHTables();
+ if (oldTables != null && oldTables.size() > 0) {
+ String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
+ Configuration conf = HBaseConfiguration.create();
+ HBaseAdmin admin = null;
+ try {
+ admin = new HBaseAdmin(conf);
+ for (String table : oldTables) {
+ if (admin.tableExists(table)) {
+ HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table));
+ String host = tableDescriptor.getValue(IRealizationConstants.HTableTag);
+ if (metadataUrlPrefix.equalsIgnoreCase(host)) {
+ if (admin.isTableEnabled(table)) {
+ admin.disableTable(table);
+ }
+ admin.deleteTable(table);
+ logger.debug("Dropped htable: " + table);
+ output.append("HBase table " + table + " is dropped. \n");
+ } else {
+ logger.debug("Skip htable: " + table);
+ output.append("Skip htable: " + table + ". \n");
+ }
+ }
+ }
+
+ } catch (IOException e) {
+ output.append("Got error when drop HBase table, exiting... \n");
+ // This should not block the merge job; Orphans should be cleaned up in StorageCleanupJob
+ return new ExecuteResult(ExecuteResult.State.ERROR, output.append(e.getLocalizedMessage()).toString());
+ } finally {
+ if (admin != null)
+ try {
+ admin.close();
+ } catch (IOException e) {
+ logger.error(e.getLocalizedMessage());
+ }
+ }
+ }
+
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+ }
+
+ public void setOldHTables(List<String> ids) {
+ setParam(OLD_HTABLES, StringUtils.join(ids, ","));
+ }
+
+ private List<String> getOldHTables() {
+ final String ids = getParam(OLD_HTABLES);
+ if (ids != null) {
+ final String[] splitted = StringUtils.split(ids, ",");
+ ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
+ for (String id : splitted) {
+ result.add(id);
+ }
+ return result;
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
new file mode 100644
index 0000000..877b71b
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.metadata.measure.DoubleMutable;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+
+/**
+ */
+public class RowValueDecoder implements Cloneable {
+
+ private final HBaseColumnDesc hbaseColumn;
+ private final byte[] hbaseColumnFamily;
+ private final byte[] hbaseColumnQualifier;
+
+ private final MeasureCodec codec;
+ private final BitSet projectionIndex;
+ private final MeasureDesc[] measures;
+ private Object[] values;
+
+ public RowValueDecoder(HBaseColumnDesc hbaseColumn) {
+ this.hbaseColumn = hbaseColumn;
+ this.hbaseColumnFamily = Bytes.toBytes(hbaseColumn.getColumnFamilyName());
+ this.hbaseColumnQualifier = Bytes.toBytes(hbaseColumn.getQualifier());
+ this.projectionIndex = new BitSet();
+ this.measures = hbaseColumn.getMeasures();
+ this.codec = new MeasureCodec(measures);
+ this.values = new Object[measures.length];
+ }
+
+ public void decode(Result hbaseRow) {
+ decode(hbaseRow, true);
+ }
+
+ public void decode(Result hbaseRow, boolean convertToJavaObject) {
+ decode(hbaseRow.getValueAsByteBuffer(hbaseColumnFamily, hbaseColumnQualifier), convertToJavaObject);
+ }
+
+ public void decode(byte[] bytes) {
+ decode(bytes, true);
+ }
+
+ public void decode(byte[] bytes, boolean convertToJavaObject) {
+ decode(ByteBuffer.wrap(bytes), convertToJavaObject);
+ }
+
+ private void decode(ByteBuffer buffer, boolean convertToJavaObject) {
+ codec.decode(buffer, values);
+ if (convertToJavaObject) {
+ convertToJavaObjects(values, values, convertToJavaObject);
+ }
+ }
+
+ private void convertToJavaObjects(Object[] mapredObjs, Object[] results, boolean convertToJavaObject) {
+ for (int i = 0; i < mapredObjs.length; i++) {
+ Object o = mapredObjs[i];
+
+ if (o instanceof LongMutable)
+ o = ((LongMutable) o).get();
+ else if (o instanceof DoubleMutable)
+ o = ((DoubleMutable) o).get();
+
+ results[i] = o;
+ }
+ }
+
+ public void setIndex(int bitIndex) {
+ projectionIndex.set(bitIndex);
+ }
+
+ public HBaseColumnDesc getHBaseColumn() {
+ return hbaseColumn;
+ }
+
+ public BitSet getProjectionIndex() {
+ return projectionIndex;
+ }
+
+ public Object[] getValues() {
+ return values;
+ }
+
+ public MeasureDesc[] getMeasures() {
+ return measures;
+ }
+
+ public boolean hasMemHungryCountDistinct() {
+ for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) {
+ FunctionDesc func = measures[i].getFunction();
+ if (func.isCountDistinct() && !func.isHolisticCountDistinct()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static boolean hasMemHungryCountDistinct(Collection<RowValueDecoder> rowValueDecoders) {
+ for (RowValueDecoder decoder : rowValueDecoders) {
+ if (decoder.hasMemHungryCountDistinct())
+ return true;
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
new file mode 100644
index 0000000..5b3949f
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.util;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.*;
+import java.util.regex.Matcher;
+
+/**
+ * @author yangli9
+ */
+public class DeployCoprocessorCLI {
+
+ private static final Logger logger = LoggerFactory.getLogger(DeployCoprocessorCLI.class);
+
+ public static final String OBSERVER_CLS_NAME = "org.apache.kylin.storage.hbase.coprocessor.observer.AggregateRegionObserver";
+ public static final String ENDPOINT_CLS_NAMAE = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint";
+
+ public static void main(String[] args) throws IOException {
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ Configuration hconf = HBaseConnection.newHBaseConfiguration(kylinConfig.getStorageUrl());
+ FileSystem fileSystem = FileSystem.get(hconf);
+ HBaseAdmin hbaseAdmin = new HBaseAdmin(hconf);
+
+ String localCoprocessorJar = new File(args[0]).getAbsolutePath();
+ logger.info("Identify coprocessor jar " + localCoprocessorJar);
+
+ List<String> tableNames = getHTableNames(kylinConfig);
+ logger.info("Identify tables " + tableNames);
+
+ Set<String> oldJarPaths = getCoprocessorJarPaths(hbaseAdmin, tableNames);
+ logger.info("Old coprocessor jar: " + oldJarPaths);
+
+ Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths);
+ logger.info("New coprocessor jar: " + hdfsCoprocessorJar);
+
+ List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames);
+
+ // Don't remove old jars, missing coprocessor jar will fail hbase
+ // removeOldJars(oldJarPaths, fileSystem);
+
+ hbaseAdmin.close();
+
+ logger.info("Processed " + processedTables);
+ logger.info("Active coprocessor jar: " + hdfsCoprocessorJar);
+ }
+
+ public static void deployCoprocessor(HTableDescriptor tableDesc) {
+ try {
+ initHTableCoprocessor(tableDesc);
+ logger.info("hbase table " + tableDesc.getName() + " deployed with coprocessor.");
+
+ } catch (Exception ex) {
+ logger.error("Error deploying coprocessor on " + tableDesc.getName(), ex);
+ logger.error("Will try creating the table without coprocessor.");
+ }
+ }
+
+ private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ Configuration hconf = HadoopUtil.getCurrentConfiguration();
+ FileSystem fileSystem = FileSystem.get(hconf);
+
+ String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
+ Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);
+
+ DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
+ }
+
+ public static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
+ logger.info("Add coprocessor on " + desc.getNameAsString());
+ desc.addCoprocessor(ENDPOINT_CLS_NAMAE, hdfsCoprocessorJar, 1000, null);
+ desc.addCoprocessor(OBSERVER_CLS_NAME, hdfsCoprocessorJar, 1001, null);
+ }
+
+ public static void resetCoprocessor(String tableName, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
+ logger.info("Disable " + tableName);
+ hbaseAdmin.disableTable(tableName);
+
+ logger.info("Unset coprocessor on " + tableName);
+ HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+ while (desc.hasCoprocessor(OBSERVER_CLS_NAME)) {
+ desc.removeCoprocessor(OBSERVER_CLS_NAME);
+ }
+ while (desc.hasCoprocessor(ENDPOINT_CLS_NAMAE)) {
+ desc.removeCoprocessor(ENDPOINT_CLS_NAMAE);
+ }
+
+ addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
+ hbaseAdmin.modifyTable(tableName, desc);
+
+ logger.info("Enable " + tableName);
+ hbaseAdmin.enableTable(tableName);
+ }
+
+ private static List<String> resetCoprocessorOnHTables(HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
+ List<String> processed = new ArrayList<String>();
+
+ for (String tableName : tableNames) {
+ try {
+ resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar);
+ processed.add(tableName);
+ } catch (IOException ex) {
+ logger.error("Error processing " + tableName, ex);
+ }
+ }
+ return processed;
+ }
+
+ public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException {
+ Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, config);
+ FileStatus newestJar = null;
+ for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
+ if (fileStatus.getPath().toString().endsWith(".jar")) {
+ if (newestJar == null) {
+ newestJar = fileStatus;
+ } else {
+ if (newestJar.getModificationTime() < fileStatus.getModificationTime())
+ newestJar = fileStatus;
+ }
+ }
+ }
+ if (newestJar == null)
+ return null;
+
+ Path path = newestJar.getPath().makeQualified(fileSystem.getUri(), null);
+ logger.info("The newest coprocessor is " + path.toString());
+ return path;
+ }
+
+ public static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException {
+ Path uploadPath = null;
+ File localCoprocessorFile = new File(localCoprocessorJar);
+
+ // check existing jars
+ if (oldJarPaths == null) {
+ oldJarPaths = new HashSet<String>();
+ }
+ Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
+ for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
+ if (isSame(localCoprocessorFile, fileStatus)) {
+ uploadPath = fileStatus.getPath();
+ break;
+ }
+ String filename = fileStatus.getPath().toString();
+ if (filename.endsWith(".jar")) {
+ oldJarPaths.add(filename);
+ }
+ }
+
+ // upload if not existing
+ if (uploadPath == null) {
+ // figure out a unique new jar file name
+ Set<String> oldJarNames = new HashSet<String>();
+ for (String path : oldJarPaths) {
+ oldJarNames.add(new Path(path).getName());
+ }
+ String baseName = getBaseFileName(localCoprocessorJar);
+ String newName = null;
+ int i = 0;
+ while (newName == null) {
+ newName = baseName + "-" + (i++) + ".jar";
+ if (oldJarNames.contains(newName))
+ newName = null;
+ }
+
+ // upload
+ uploadPath = new Path(coprocessorDir, newName);
+ FileInputStream in = null;
+ FSDataOutputStream out = null;
+ try {
+ in = new FileInputStream(localCoprocessorFile);
+ out = fileSystem.create(uploadPath);
+ IOUtils.copy(in, out);
+ } finally {
+ IOUtils.closeQuietly(in);
+ IOUtils.closeQuietly(out);
+ }
+
+ fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), -1);
+
+ }
+
+ uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null);
+ return uploadPath;
+ }
+
+ private static boolean isSame(File localCoprocessorFile, FileStatus fileStatus) {
+ return fileStatus.getLen() == localCoprocessorFile.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified();
+ }
+
+ private static String getBaseFileName(String localCoprocessorJar) {
+ File localJar = new File(localCoprocessorJar);
+ String baseName = localJar.getName();
+ if (baseName.endsWith(".jar"))
+ baseName = baseName.substring(0, baseName.length() - ".jar".length());
+ return baseName;
+ }
+
+ private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException {
+ String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
+ Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
+ fileSystem.mkdirs(coprocessorDir);
+ return coprocessorDir;
+ }
+
+ private static Set<String> getCoprocessorJarPaths(HBaseAdmin hbaseAdmin, List<String> tableNames) throws IOException {
+ HashSet<String> result = new HashSet<String>();
+
+ for (String tableName : tableNames) {
+ HTableDescriptor tableDescriptor = null;
+ try {
+ tableDescriptor = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+ } catch (TableNotFoundException e) {
+ logger.warn("Table not found " + tableName, e);
+ continue;
+ }
+
+ Matcher keyMatcher;
+ Matcher valueMatcher;
+ for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e : tableDescriptor.getValues().entrySet()) {
+ keyMatcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get()));
+ if (!keyMatcher.matches()) {
+ continue;
+ }
+ valueMatcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes.toString(e.getValue().get()));
+ if (!valueMatcher.matches()) {
+ continue;
+ }
+
+ String jarPath = valueMatcher.group(1).trim();
+ String clsName = valueMatcher.group(2).trim();
+
+ if (OBSERVER_CLS_NAME.equals(clsName)) {
+ result.add(jarPath);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ private static List<String> getHTableNames(KylinConfig config) {
+ CubeManager cubeMgr = CubeManager.getInstance(config);
+
+ ArrayList<String> result = new ArrayList<String>();
+ for (CubeInstance cube : cubeMgr.listAllCubes()) {
+ for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) {
+ String tableName = seg.getStorageLocationIdentifier();
+ if (StringUtils.isBlank(tableName) == false) {
+ result.add(tableName);
+ System.out.println("added new table: " + tableName);
+ }
+ }
+ }
+
+ for (IIInstance ii : IIManager.getInstance(config).listAllIIs()) {
+ if (ii.getStatus() == RealizationStatusEnum.READY) {
+ for (IISegment seg : ii.getSegments()) {//streaming segment is never "READY"
+ String tableName = seg.getStorageLocationIdentifier();
+ if (StringUtils.isBlank(tableName) == false) {
+ result.add(tableName);
+ System.out.println("added new table: " + tableName);
+ }
+ }
+ }
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java
new file mode 100644
index 0000000..ba0da00
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+/** This class will come with HBase 2.0 in package org.apache.hadoop.hbase.util **/
+package org.apache.kylin.storage.hbase.util;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.RegionLoad;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HBaseRegionSizeCalculator {
+
+ private static final Logger logger = LoggerFactory.getLogger(HBaseRegionSizeCalculator.class);
+
+ /**
+ * Maps each region to its size in bytes.
+ **/
+ private final Map<byte[], Long> sizeMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+
+ static final String ENABLE_REGIONSIZECALCULATOR = "hbase.regionsizecalculator.enable";
+
+ /**
+ * Computes size of each region for table and given column families.
+ * */
+ public HBaseRegionSizeCalculator(HTable table) throws IOException {
+ this(table, new HBaseAdmin(table.getConfiguration()));
+ }
+
+ /** Constructor for unit testing */
+ HBaseRegionSizeCalculator(HTable table, HBaseAdmin hBaseAdmin) throws IOException {
+
+ try {
+ if (!enabled(table.getConfiguration())) {
+ logger.info("Region size calculation disabled.");
+ return;
+ }
+
+ logger.info("Calculating region sizes for table \"" + new String(table.getTableName()) + "\".");
+
+ // Get regions for table.
+ Set<HRegionInfo> tableRegionInfos = table.getRegionLocations().keySet();
+ Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+
+ for (HRegionInfo regionInfo : tableRegionInfos) {
+ tableRegions.add(regionInfo.getRegionName());
+ }
+
+ ClusterStatus clusterStatus = hBaseAdmin.getClusterStatus();
+ Collection<ServerName> servers = clusterStatus.getServers();
+ final long megaByte = 1024L * 1024L;
+
+ // Iterate all cluster regions, filter regions from our table and
+ // compute their size.
+ for (ServerName serverName : servers) {
+ ServerLoad serverLoad = clusterStatus.getLoad(serverName);
+
+ for (RegionLoad regionLoad : serverLoad.getRegionsLoad().values()) {
+ byte[] regionId = regionLoad.getName();
+
+ if (tableRegions.contains(regionId)) {
+
+ long regionSizeBytes = regionLoad.getStorefileSizeMB() * megaByte;
+ sizeMap.put(regionId, regionSizeBytes);
+
+ // logger.info("Region " + regionLoad.getNameAsString()
+ // + " has size " + regionSizeBytes);
+ }
+ }
+ }
+ } finally {
+ hBaseAdmin.close();
+ }
+
+ }
+
+ boolean enabled(Configuration configuration) {
+ return configuration.getBoolean(ENABLE_REGIONSIZECALCULATOR, true);
+ }
+
+ /**
+ * Returns size of given region in bytes. Returns 0 if region was not found.
+ **/
+ public long getRegionSize(byte[] regionId) {
+ Long size = sizeMap.get(regionId);
+ if (size == null) {
+ logger.info("Unknown region:" + Arrays.toString(regionId));
+ return 0;
+ } else {
+ return size;
+ }
+ }
+
+ public Map<byte[], Long> getRegionSizeMap() {
+ return Collections.unmodifiableMap(sizeMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/LZOSupportnessChecker.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/LZOSupportnessChecker.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/LZOSupportnessChecker.java
new file mode 100644
index 0000000..3645bed
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/LZOSupportnessChecker.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.util;
+
+import org.apache.hadoop.hbase.util.CompressionTest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+/**
+ */
+public class LZOSupportnessChecker {
+ private static final Logger log = LoggerFactory.getLogger(LZOSupportnessChecker.class);
+
+ public static boolean getSupportness() {
+ try {
+ File temp = File.createTempFile("test", ".tmp");
+ CompressionTest.main(new String[] { "file://" + temp.getAbsolutePath(), "lzo" });
+ } catch (Exception e) {
+ log.warn("LZO support is disabled. Fail to compress file with lzo: " + e.toString());
+ return false;
+ }
+ return true;
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.out.println("LZO supported by current env? " + getSupportness());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
new file mode 100644
index 0000000..9985acd
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
@@ -0,0 +1,84 @@
+package org.apache.kylin.storage.hbase.util;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.lock.JobLock;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class ZookeeperJobLock implements JobLock {
+ private Logger logger = LoggerFactory.getLogger(ZookeeperJobLock.class);
+
+ private static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
+
+ private String scheduleID;
+ private InterProcessMutex sharedLock;
+ private CuratorFramework zkClient;
+
+ @Override
+ public boolean lock() {
+ this.scheduleID = schedulerId();
+ String ZKConnectString = getZKConnectString();
+ if (StringUtils.isEmpty(ZKConnectString)) {
+ throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
+ }
+
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ this.zkClient = CuratorFrameworkFactory.newClient(ZKConnectString, retryPolicy);
+ this.zkClient.start();
+ this.sharedLock = new InterProcessMutex(zkClient, this.scheduleID);
+ boolean hasLock = false;
+ try {
+ hasLock = sharedLock.acquire(3, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ logger.warn("error acquire lock", e);
+ }
+ if (!hasLock) {
+ logger.warn("fail to acquire lock, scheduler has not been started");
+ zkClient.close();
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void unlock() {
+ releaseLock();
+ }
+
+ private String getZKConnectString() {
+ Configuration conf = HBaseConnection.newHBaseConfiguration(KylinConfig.getInstanceFromEnv().getStorageUrl());
+ return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
+ }
+
+ private void releaseLock() {
+ try {
+ if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) {
+ // client.setData().forPath(ZOOKEEPER_LOCK_PATH, null);
+ if (zkClient.checkExists().forPath(scheduleID) != null) {
+ zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(scheduleID);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("error release lock:" + scheduleID);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String schedulerId() {
+ return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/CreateHTableTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/CreateHTableTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/CreateHTableTest.java
new file mode 100644
index 0000000..8b09f05
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/CreateHTableTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.storage.hbase.steps.CreateHTableJob;
+
+/**
+ * @author George Song (ysong1)
+ *
+ */
+public class CreateHTableTest extends LocalFileMetadataTestCase {
+
+ private Configuration conf;
+
+ @Before
+ public void setup() throws Exception {
+ conf = new Configuration();
+ conf.set("fs.default.name", "file:///");
+ conf.set("mapred.job.tracker", "local");
+ this.createTestMetadata();
+
+ }
+
+ @After
+ public void after() throws Exception {
+ this.cleanupTestMetadata();
+ }
+
+ @Test
+ public void testGetSplits() throws IllegalArgumentException, Exception {
+ CreateHTableJob c = new CreateHTableJob();
+
+ String input = "src/test/resources/partition_list/part-r-00000";
+
+ byte[][] splits = c.getSplits(conf, new Path(input));
+
+ assertEquals(497, splits.length);
+ assertArrayEquals(new byte[] { 0, 0, 0, 0, 0, 0, 15, -1, 11, 51, -45, 2 }, splits[0]);
+ assertArrayEquals(new byte[] { 0, 0, 0, 0, 0, 3, -1, -1, -54, -61, 109, -44, 1 }, splits[496]);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ITHdfsOpsTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ITHdfsOpsTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ITHdfsOpsTest.java
new file mode 100644
index 0000000..5381187
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ITHdfsOpsTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ */
+public class ITHdfsOpsTest extends HBaseMetadataTestCase {
+
+ FileSystem fileSystem;
+
+ @Before
+ public void setup() throws Exception {
+
+ this.createTestMetadata();
+
+ Configuration hconf = new Configuration();
+
+ fileSystem = FileSystem.get(hconf);
+ }
+
+ @Test
+ public void TestPath() throws IOException {
+ String hdfsWorkingDirectory = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory();
+ Path coprocessorDir = new Path(hdfsWorkingDirectory, "test");
+ fileSystem.mkdirs(coprocessorDir);
+
+ Path newFile = new Path(coprocessorDir, "test_file");
+ newFile = newFile.makeQualified(fileSystem.getUri(), null);
+ FSDataOutputStream stream = fileSystem.create(newFile);
+ stream.write(new byte[] { 0, 1, 2 });
+ stream.close();
+ }
+
+ @After
+ public void after() throws Exception {
+ this.cleanupTestMetadata();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/RowValueDecoderTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/RowValueDecoderTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/RowValueDecoderTest.java
index dcc5b06..3ff1fc1 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/RowValueDecoderTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/RowValueDecoderTest.java
@@ -35,6 +35,7 @@ import org.apache.kylin.metadata.measure.LongMutable;
import org.apache.kylin.metadata.measure.MeasureCodec;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/TestHbaseClient.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/TestHbaseClient.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/TestHbaseClient.java
new file mode 100644
index 0000000..0ea037b
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/TestHbaseClient.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.kylin.common.util.Bytes;
+
+/**
+ */
+public class TestHbaseClient {
+
+ private static boolean reverse = false;
+
+ public static void foo(int n, int k) {
+ int t = k;
+ if (n - k < k) {
+ t = n - k;
+ reverse = true;
+ }
+ boolean[] flags = new boolean[n];
+ inner(flags, 0, t);
+ }
+
+ private static void print(boolean[] flags) {
+ for (int i = 0; i < flags.length; i++) {
+ if (!reverse) {
+ if (flags[i])
+ System.out.print("0");
+ else
+ System.out.print("1");
+ } else {
+ if (flags[i])
+ System.out.print("1");
+ else
+ System.out.print("0");
+
+ }
+ }
+ System.out.println();
+
+ }
+
+ private static void inner(boolean[] flags, int start, int remaining) {
+ if (remaining <= 0) {
+ print(flags);
+ return;
+ }
+
+ if (flags.length - start < remaining) {
+ return;
+ }
+
+ // write at flags[start]
+ flags[start] = true;
+ inner(flags, start + 1, remaining - 1);
+
+ // not write at flags[start]
+ flags[start] = false;
+ inner(flags, start + 1, remaining);
+ }
+
+ public static void main(String[] args) throws IOException {
+ foo(6, 5);
+ foo(5, 2);
+ foo(3, 0);
+
+ Configuration conf = HBaseConfiguration.create();
+ conf.set("hbase.zookeeper.quorum", "hbase_host");
+ conf.set("zookeeper.znode.parent", "/hbase-unsecure");
+
+ HTable table = new HTable(conf, "test1");
+ Put put = new Put(Bytes.toBytes("row1"));
+
+ put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1"));
+ put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val2"));
+
+ table.put(put);
+ table.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
new file mode 100644
index 0000000..733de1d
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author yangli9
+ *
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class CubeHFileMapper2Test extends LocalFileMetadataTestCase {
+
+ String cubeName = "test_kylin_cube_with_slr_ready";
+
+ MeasureCodec codec;
+ ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+ Object[] outKV = new Object[2];
+
+ @Before
+ public void setup() throws Exception {
+ this.createTestMetadata();
+ // hack for distributed cache
+ FileUtils.deleteDirectory(new File("../job/meta"));
+ FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl()), new File("../job/meta"));
+ CubeDesc desc = CubeManager.getInstance(getTestConfig()).getCube(cubeName).getDescriptor();
+ codec = new MeasureCodec(desc.getMeasures());
+ }
+
+ @After
+ public void after() throws Exception {
+ cleanupTestMetadata();
+ FileUtils.deleteDirectory(new File("../job/meta"));
+ }
+
+ @Test
+ public void testBasic() throws Exception {
+
+ Configuration hconf = new Configuration();
+ Context context = MockupMapContext.create(hconf, getTestConfig().getMetadataUrl(), cubeName, outKV);
+
+ CubeHFileMapper mapper = new CubeHFileMapper();
+ mapper.setup(context);
+
+ Text key = new Text("not important");
+ Text value = new Text(new byte[] { 2, 2, 51, -79, 1 });
+
+ mapper.map(key, value, context);
+
+ ImmutableBytesWritable outKey = (ImmutableBytesWritable) outKV[0];
+ KeyValue outValue = (KeyValue) outKV[1];
+
+ assertTrue(Bytes.compareTo(key.getBytes(), 0, key.getLength(), outKey.get(), outKey.getOffset(), outKey.getLength()) == 0);
+
+ assertTrue(Bytes.compareTo(value.getBytes(), 0, value.getLength(), outValue.getValueArray(), outValue.getValueOffset(), outValue.getValueLength()) == 0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java
new file mode 100644
index 0000000..0c148e7
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.storage.hbase.steps.CubeHFileMapper;
+
+/**
+ * @author George Song (ysong1)
+ *
+ */
+public class CubeHFileMapperTest {
+
+ MapDriver<Text, Text, ImmutableBytesWritable, KeyValue> mapDriver;
+
+ private String cube_name = "FLAT_ITEM_CUBE";
+
+ @Before
+ public void setUp() {
+ CubeHFileMapper mapper = new CubeHFileMapper();
+ mapDriver = MapDriver.newMapDriver(mapper);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ @Ignore("not maintaining")
+ public void testMapper2() throws IOException {
+ mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cube_name);
+
+ mapDriver.addInput(new Text("52010tech"), new Text("35.432"));
+
+ List<Pair<ImmutableBytesWritable, KeyValue>> result = mapDriver.run();
+
+ assertEquals(2, result.size());
+
+ byte[] bytes = { 0, 0, 0, 0, 0, 0, 0, 119, 33, 0, 22, 1, 0, 121, 7 };
+ ImmutableBytesWritable key = new ImmutableBytesWritable(bytes);
+
+ Pair<ImmutableBytesWritable, KeyValue> p1 = result.get(0);
+ Pair<ImmutableBytesWritable, KeyValue> p2 = result.get(1);
+
+ assertEquals(key, p1.getFirst());
+ assertEquals("cf1", new String(p1.getSecond().getFamily()));
+ assertEquals("usd_amt", new String(p1.getSecond().getQualifier()));
+ assertEquals("35.43", new String(p1.getSecond().getValue()));
+
+ assertEquals(key, p2.getFirst());
+ assertEquals("cf1", new String(p2.getSecond().getFamily()));
+ assertEquals("item_count", new String(p2.getSecond().getQualifier()));
+ assertEquals("2", new String(p2.getSecond().getValue()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/MockupMapContext.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/MockupMapContext.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/MockupMapContext.java
new file mode 100644
index 0000000..d5c3f60
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/MockupMapContext.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.security.Credentials;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+
+/**
+ * @author yangli9
+ *
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class MockupMapContext {
+
+ public static Context create(final Configuration hconf, String metadataUrl, String cubeName, final Object[] outKV) {
+
+ hconf.set(BatchConstants.CFG_CUBE_NAME, cubeName);
+
+ return new WrappedMapper().getMapContext(new MapContext() {
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException, InterruptedException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Object getCurrentValue() throws IOException, InterruptedException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void write(Object key, Object value) throws IOException, InterruptedException {
+ System.out.println("Write -- k:" + key + ", v:" + value);
+ if (outKV != null) {
+ outKV[0] = key;
+ outKV[1] = value;
+ }
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public TaskAttemptID getTaskAttemptID() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void setStatus(String msg) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public String getStatus() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public float getProgress() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> counterName) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Counter getCounter(String groupName, String counterName) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return hconf;
+ }
+
+ @Override
+ public Credentials getCredentials() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public JobID getJobID() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public int getNumReduceTasks() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Path getWorkingDirectory() throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Class<?> getOutputKeyClass() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Class<?> getOutputValueClass() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Class<?> getMapOutputKeyClass() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Class<?> getMapOutputValueClass() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public String getJobName() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Class<? extends InputFormat<?, ?>> getInputFormatClass() throws ClassNotFoundException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass() throws ClassNotFoundException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass() throws ClassNotFoundException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass() throws ClassNotFoundException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Class<? extends OutputFormat<?, ?>> getOutputFormatClass() throws ClassNotFoundException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Class<? extends Partitioner<?, ?>> getPartitionerClass() throws ClassNotFoundException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public RawComparator<?> getSortComparator() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public String getJar() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public RawComparator<?> getGroupingComparator() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean getJobSetupCleanupNeeded() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean getTaskCleanupNeeded() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean getProfileEnabled() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public String getProfileParams() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public IntegerRanges getProfileTaskRange(boolean isMap) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public String getUser() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean getSymlink() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Path[] getArchiveClassPaths() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public URI[] getCacheArchives() throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public URI[] getCacheFiles() throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Path[] getLocalCacheArchives() throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Path[] getLocalCacheFiles() throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Path[] getFileClassPaths() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public String[] getArchiveTimestamps() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public String[] getFileTimestamps() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public int getMaxMapAttempts() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public int getMaxReduceAttempts() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void progress() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public InputSplit getInputSplit() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public RawComparator<?> getCombinerKeyGroupingComparator() {
+ throw new NotImplementedException();
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java
new file mode 100644
index 0000000..78ea71f
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.engine.mr.steps.RangeKeyDistributionJob;
+
+/**
+ * @author ysong1
+ *
+ */
+public class RangeKeyDistributionJobTest extends LocalFileMetadataTestCase {
+
+ private Configuration conf;
+
+ @Before
+ public void setup() throws Exception {
+ conf = new Configuration();
+ conf.set("fs.default.name", "file:///");
+ conf.set("mapred.job.tracker", "local");
+
+ // for local runner out-of-memory issue
+ conf.set("mapreduce.task.io.sort.mb", "10");
+ createTestMetadata();
+ }
+
+ @After
+ public void after() throws Exception {
+ cleanupTestMetadata();
+ }
+
+ @Test
+ public void testJob() throws Exception {
+ String input = "src/test/resources/data/base_cuboid/,src/test/resources/data/6d_cuboid/";
+ String output = "target/test-output/key_distribution_range/";
+ String jobname = "calculate_splits";
+ String cubename = "test_kylin_cube_with_slr_ready";
+
+ FileUtil.fullyDelete(new File(output));
+
+ String[] args = { "-input", input, "-output", output, "-jobname", jobname, "-cubename", cubename };
+ assertEquals("Job failed", 0, ToolRunner.run(conf, new RangeKeyDistributionJob(), args));
+ }
+
+}