You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/08/20 10:48:07 UTC

[kylin] 01/03: KYLIN-3446 Connect to HBase out of Spark Signed-off-by: shaofengshi

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 76c9c960be542c919301c72b34c7ae5ce6f1ec1c
Author: Yichen Zhou <zh...@gmail.com>
AuthorDate: Wed Aug 8 09:53:29 2018 +0800

    KYLIN-3446 Connect to HBase out of Spark
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../apache/kylin/engine/mr/JobBuilderSupport.java  |  4 ++
 .../kylin/engine/mr/common/AbstractHadoopJob.java  |  3 +-
 .../kylin/engine/mr/common/BatchConstants.java     |  2 +-
 .../kylin/storage/hbase/steps/CreateHTableJob.java | 68 ++++++++++++++++++----
 .../kylin/storage/hbase/steps/HBaseJobSteps.java   | 28 ++++++---
 .../kylin/storage/hbase/steps/HBaseSparkSteps.java |  3 +-
 .../kylin/storage/hbase/steps/SparkCubeHFile.java  | 29 +++++----
 7 files changed, 97 insertions(+), 40 deletions(-)

diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 649b4c3..c6abf16 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -311,6 +311,10 @@ public class JobBuilderSupport {
         return getOptimizationRootPath(jobId) + "/cuboid/";
     }
 
+    public String getHBaseConfFilePath(String jobId) {
+       return getJobWorkingDir(jobId) + "/hbase-conf.xml";
+    }
+
     // ============================================================================
     // static methods also shared by other job flow participant
     // ----------------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 329dd56..2976080 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -129,7 +129,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
             .create(BatchConstants.ARG_LOOKUP_SNAPSHOT_ID);
     protected static final Option OPTION_META_URL = OptionBuilder.withArgName(BatchConstants.ARG_META_URL)
             .hasArg().isRequired(true).withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
-
+    public static final Option OPTION_HBASE_CONF_PATH = OptionBuilder.withArgName(BatchConstants.ARG_HBASE_CONF_PATH).hasArg()
+            .isRequired(true).withDescription("HBase config file path").create(BatchConstants.ARG_HBASE_CONF_PATH);
 
     private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index a4a52ad..6fe55e2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -104,7 +104,7 @@ public interface BatchConstants {
     String ARG_LOOKUP_SNAPSHOT_ID = "snapshotID";
     String ARG_EXT_LOOKUP_SNAPSHOTS_INFO = "extlookupsnapshots";
     String ARG_META_URL = "metadataUrl";
-
+    String ARG_HBASE_CONF_PATH = "hbaseConfPath";
     /**
      * logger and counter
      */
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 68aa172..5e17a4c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -26,12 +26,17 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.cli.Options;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
@@ -46,6 +51,7 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.engine.mr.common.CuboidShardUtil;
+import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,6 +69,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
     CubeDesc cubeDesc = null;
     String segmentID = null;
     String cuboidModeName = null;
+    String hbaseConfPath = null;
     KylinConfig kylinConfig;
     Path partitionFilePath;
 
@@ -74,6 +81,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         options.addOption(OPTION_SEGMENT_ID);
         options.addOption(OPTION_PARTITION_FILE_PATH);
         options.addOption(OPTION_CUBOID_MODE);
+        options.addOption(OPTION_HBASE_CONF_PATH);
         parseOptions(options, args);
 
         partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH));
@@ -85,11 +93,12 @@ public class CreateHTableJob extends AbstractHadoopJob {
         kylinConfig = cube.getConfig();
         segmentID = getOptionValue(OPTION_SEGMENT_ID);
         cuboidModeName = getOptionValue(OPTION_CUBOID_MODE);
+        hbaseConfPath = getOptionValue(OPTION_HBASE_CONF_PATH);
         CubeSegment cubeSegment = cube.getSegmentById(segmentID);
 
         byte[][] splitKeys;
         Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment, kylinConfig).getCuboidSizeMap();
-        
+
         // for cube planner, will keep cuboidSizeMap unchanged if cube planner is disabled
         Set<Long> buildingCuboids = cube.getCuboidsByMode(cuboidModeName);
         if (buildingCuboids != null && !buildingCuboids.isEmpty()) {
@@ -104,14 +113,35 @@ public class CreateHTableJob extends AbstractHadoopJob {
             }
             cuboidSizeMap = optimizedCuboidSizeMap;
         }
-        
+
         splitKeys = getRegionSplitsFromCuboidStatistics(cuboidSizeMap, kylinConfig, cubeSegment,
                 partitionFilePath.getParent());
 
         CubeHTableUtil.createHTable(cubeSegment, splitKeys);
+        exportHBaseConfiguration(cubeSegment.getStorageLocationIdentifier());
         return 0;
     }
 
+    private void exportHBaseConfiguration(String hbaseTableName) throws Exception {
+
+        Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
+        HadoopUtil.healSickConfig(hbaseConf);
+        Job job = Job.getInstance(hbaseConf, hbaseTableName);
+        HTable table = new HTable(hbaseConf, hbaseTableName);
+        HFileOutputFormat2.configureIncrementalLoadMap(job, table);
+
+        logger.info("Saving HBase configuration to " + hbaseConfPath);
+        FileSystem fs = HadoopUtil.getWorkingFileSystem();
+        FSDataOutputStream out = null;
+        try {
+            out = fs.create(new Path(hbaseConfPath));
+            job.getConfiguration().writeXml(out);
+        } catch (IOException e) {
+            throw new ExecuteException("Write hbase configuration failed", e);
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+    }
 
     //one region for one shard
     private static byte[][] getSplitsByRegionCount(int regionCount) {
@@ -124,7 +154,9 @@ public class CreateHTableJob extends AbstractHadoopJob {
         return result;
     }
 
-    public static byte[][] getRegionSplitsFromCuboidStatistics(final Map<Long, Double> cubeSizeMap, final KylinConfig kylinConfig, final CubeSegment cubeSegment, final Path hfileSplitsOutputFolder) throws IOException {
+    public static byte[][] getRegionSplitsFromCuboidStatistics(final Map<Long, Double> cubeSizeMap,
+            final KylinConfig kylinConfig, final CubeSegment cubeSegment, final Path hfileSplitsOutputFolder)
+            throws IOException {
 
         final CubeDesc cubeDesc = cubeSegment.getCubeDesc();
         float cut = cubeDesc.getConfig().getKylinHBaseRegionCut();
@@ -157,7 +189,8 @@ public class CreateHTableJob extends AbstractHadoopJob {
             }
 
             if (nRegion != original) {
-                logger.info("Region count is adjusted from " + original + " to " + nRegion + " to help random sharding");
+                logger.info(
+                        "Region count is adjusted from " + original + " to " + nRegion + " to help random sharding");
             }
         }
 
@@ -188,10 +221,13 @@ public class CreateHTableJob extends AbstractHadoopJob {
                 }
 
                 if (shardNum > nRegion) {
-                    logger.info(String.format("Cuboid %d 's estimated size %.2f MB will generate %d regions, reduce to %d", cuboidId, estimatedSize, shardNum, nRegion));
+                    logger.info(
+                            String.format("Cuboid %d 's estimated size %.2f MB will generate %d regions, reduce to %d",
+                                    cuboidId, estimatedSize, shardNum, nRegion));
                     shardNum = nRegion;
                 } else {
-                    logger.info(String.format("Cuboid %d 's estimated size %.2f MB will generate %d regions", cuboidId, estimatedSize, shardNum));
+                    logger.info(String.format("Cuboid %d 's estimated size %.2f MB will generate %d regions", cuboidId,
+                            estimatedSize, shardNum));
                 }
 
                 cuboidShards.put(cuboidId, (short) shardNum);
@@ -204,7 +240,8 @@ public class CreateHTableJob extends AbstractHadoopJob {
             }
 
             for (int i = 0; i < nRegion; ++i) {
-                logger.info(String.format("Region %d's estimated size is %.2f MB, accounting for %.2f percent", i, regionSizes[i], 100.0 * regionSizes[i] / totalSizeInM));
+                logger.info(String.format("Region %d's estimated size is %.2f MB, accounting for %.2f percent", i,
+                        regionSizes[i], 100.0 * regionSizes[i] / totalSizeInM));
             }
 
             CuboidShardUtil.saveCuboidShards(cubeSegment, cuboidShards, nRegion);
@@ -222,7 +259,8 @@ public class CreateHTableJob extends AbstractHadoopJob {
                 if (size >= mbPerRegion || (size + cubeSizeMap.get(cuboidId)) >= mbPerRegion * 1.2) {
                     // if the size already bigger than threshold, or it will exceed by 20%, cut for next region
                     regionSplit.add(cuboidId);
-                    logger.info("Region " + regionIndex + " will be " + size + " MB, contains cuboids < " + cuboidId + " (" + cuboidCount + ") cuboids");
+                    logger.info("Region " + regionIndex + " will be " + size + " MB, contains cuboids < " + cuboidId
+                            + " (" + cuboidCount + ") cuboids");
                     size = 0;
                     cuboidCount = 0;
                     regionIndex++;
@@ -240,7 +278,8 @@ public class CreateHTableJob extends AbstractHadoopJob {
         }
     }
 
-    protected static void saveHFileSplits(final List<HashMap<Long, Double>> innerRegionSplits, int mbPerRegion, final Path outputFolder, final KylinConfig kylinConfig) throws IOException {
+    protected static void saveHFileSplits(final List<HashMap<Long, Double>> innerRegionSplits, int mbPerRegion,
+            final Path outputFolder, final KylinConfig kylinConfig) throws IOException {
 
         if (outputFolder == null) {
             logger.warn("outputFolder for hfile split file is null, skip inner region split");
@@ -300,7 +339,8 @@ public class CreateHTableJob extends AbstractHadoopJob {
                     logger.info(String.format("Region %d's hfile %d size is %.2f mb", i, j, accumulatedSize));
                     byte[] split = new byte[RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN];
                     BytesUtil.writeUnsigned(i, split, 0, RowConstants.ROWKEY_SHARDID_LEN);
-                    System.arraycopy(Bytes.toBytes(cuboid), 0, split, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
+                    System.arraycopy(Bytes.toBytes(cuboid), 0, split, RowConstants.ROWKEY_SHARDID_LEN,
+                            RowConstants.ROWKEY_CUBOIDID_LEN);
                     splits.add(split);
                     accumulatedSize = 0;
                     j++;
@@ -310,11 +350,15 @@ public class CreateHTableJob extends AbstractHadoopJob {
 
         }
 
-        SequenceFile.Writer hfilePartitionWriter = SequenceFile.createWriter(hbaseConf, SequenceFile.Writer.file(hfilePartitionFile), SequenceFile.Writer.keyClass(RowKeyWritable.class), SequenceFile.Writer.valueClass(NullWritable.class));
+        SequenceFile.Writer hfilePartitionWriter = SequenceFile.createWriter(hbaseConf,
+                SequenceFile.Writer.file(hfilePartitionFile), SequenceFile.Writer.keyClass(RowKeyWritable.class),
+                SequenceFile.Writer.valueClass(NullWritable.class));
 
         for (int i = 0; i < splits.size(); i++) {
             //when we compare the rowkey, we compare the row firstly.
-            hfilePartitionWriter.append(new RowKeyWritable(KeyValue.createFirstOnRow(splits.get(i)).createKeyOnly(false).getKey()), NullWritable.get());
+            hfilePartitionWriter.append(
+                    new RowKeyWritable(KeyValue.createFirstOnRow(splits.get(i)).createKeyOnly(false).getKey()),
+                    NullWritable.get());
         }
         hfilePartitionWriter.close();
     }
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
index 4fda139..e48090d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
@@ -59,8 +59,10 @@ public abstract class HBaseJobSteps extends JobBuilderSupport {
         StringBuilder cmd = new StringBuilder();
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
         appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
-        appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION,
+                getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_HBASE_CONF_PATH, getHBaseConfFilePath(jobId));
 
         createHtableStep.setJobParams(cmd.toString());
         createHtableStep.setJobClass(CreateHTableJob.class);
@@ -69,7 +71,8 @@ public abstract class HBaseJobSteps extends JobBuilderSupport {
     }
 
     // TODO make it abstract
-    public MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegment> mergingSegments, String jobID, Class<? extends AbstractHadoopJob> clazz) {
+    public MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegment> mergingSegments,
+            String jobID, Class<? extends AbstractHadoopJob> clazz) {
         final List<String> mergingCuboidPaths = Lists.newArrayList();
         for (CubeSegment merging : mergingSegments) {
             mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
@@ -86,7 +89,8 @@ public abstract class HBaseJobSteps extends JobBuilderSupport {
         appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
         appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, formattedPath);
         appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
-        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+                "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
 
         mergeCuboidDataStep.setMapReduceParams(cmd.toString());
         mergeCuboidDataStep.setMapReduceJobClass(clazz);
@@ -148,8 +152,10 @@ public abstract class HBaseJobSteps extends JobBuilderSupport {
     }
 
     public List<String> getMergingHTables() {
-        final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment) seg);
-        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge, target segment " + seg);
+        final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization())
+                .getMergingSegments((CubeSegment) seg);
+        Preconditions.checkState(mergingSegments.size() > 1,
+                "there should be more than 2 segments to merge, target segment " + seg);
         final List<String> mergingHTables = Lists.newArrayList();
         for (CubeSegment merging : mergingSegments) {
             mergingHTables.add(merging.getStorageLocationIdentifier());
@@ -158,8 +164,10 @@ public abstract class HBaseJobSteps extends JobBuilderSupport {
     }
 
     public List<String> getMergingHDFSPaths() {
-        final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment) seg);
-        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge, target segment " + seg);
+        final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization())
+                .getMergingSegments((CubeSegment) seg);
+        Preconditions.checkState(mergingSegments.size() > 1,
+                "there should be more than 2 segments to merge, target segment " + seg);
         final List<String> mergingHDFSPaths = Lists.newArrayList();
         for (CubeSegment merging : mergingSegments) {
             mergingHDFSPaths.add(getJobWorkingDir(merging.getLastBuildJobID()));
@@ -180,11 +188,13 @@ public abstract class HBaseJobSteps extends JobBuilderSupport {
     }
 
     public String getHFilePath(String jobId) {
-        return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/hfile/");
+        return HBaseConnection.makeQualifiedPathInHBaseCluster(
+                getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/hfile/");
     }
 
     public String getRowkeyDistributionOutputPath(String jobId) {
-        return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/rowkey_stats");
+        return HBaseConnection.makeQualifiedPathInHBaseCluster(
+                getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/rowkey_stats");
     }
 
     public void addOptimizeGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
index 622a0e8..be230f0 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
@@ -22,6 +22,7 @@ import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.spark.SparkBatchCubingJobBuilder2;
 import org.apache.kylin.engine.spark.SparkExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
@@ -48,7 +49,7 @@ public class HBaseSparkSteps extends HBaseJobSteps {
         sparkExecutable.setParam(SparkCubeHFile.OPTION_OUTPUT_PATH.getOpt(), getHFilePath(jobId));
         sparkExecutable.setParam(SparkCubeHFile.OPTION_PARTITION_FILE_PATH.getOpt(),
                 getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");
-
+        sparkExecutable.setParam(AbstractHadoopJob.OPTION_HBASE_CONF_PATH.getOpt(), getHBaseConfFilePath(jobId));
         sparkExecutable.setJobId(jobId);
 
         StringBuilder jars = new StringBuilder();
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index fd8459f..c87a739 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -17,7 +17,6 @@
 */
 package org.apache.kylin.storage.hbase.steps;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -30,12 +29,11 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -58,7 +56,6 @@ import org.apache.kylin.engine.mr.common.SerializableConfiguration;
 import org.apache.kylin.engine.spark.KylinSparkJobListener;
 import org.apache.kylin.engine.spark.SparkUtil;
 import org.apache.kylin.measure.MeasureCodec;
-import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.spark.Partitioner;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -102,6 +99,7 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
         options.addOption(OPTION_META_URL);
         options.addOption(OPTION_OUTPUT_PATH);
         options.addOption(OPTION_PARTITION_FILE_PATH);
+        options.addOption(AbstractHadoopJob.OPTION_HBASE_CONF_PATH);
     }
 
     @Override
@@ -117,11 +115,12 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
         final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
         final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
         final Path partitionFilePath = new Path(optionsHelper.getOptionValue(OPTION_PARTITION_FILE_PATH));
+        final String hbaseConfFile = optionsHelper.getOptionValue(AbstractHadoopJob.OPTION_HBASE_CONF_PATH);
 
         Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"), KeyValueCreator.class,
                 KeyValue.class, RowKeyWritable.class };
 
-        SparkConf conf = new SparkConf().setAppName("Covnerting Hfile for:" + cubeName + " segment " + segmentId);
+        SparkConf conf = new SparkConf().setAppName("Converting HFile for:" + cubeName + " segment " + segmentId);
         //serialization conf
         conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
         conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
@@ -171,17 +170,15 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
         }
 
         logger.info("There are " + keys.size() + " split keys, totally " + (keys.size() + 1) + " hfiles");
-        Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
-        HadoopUtil.healSickConfig(hbaseConf);
-        Job job = new Job(hbaseConf, cubeSegment.getStorageLocationIdentifier());
-        job.getConfiguration().set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3
-        HTable table = new HTable(hbaseConf, cubeSegment.getStorageLocationIdentifier());
-        try {
-            HFileOutputFormat2.configureIncrementalLoadMap(job, table);
-        } catch (IOException ioe) {
-            // this can be ignored.
-            logger.debug(ioe.getMessage(), ioe);
-        }
+
+        //HBase conf
+        logger.info("Loading HBase configuration from:" + hbaseConfFile);
+        FSDataInputStream confInput = fs.open(new Path(hbaseConfFile));
+
+        Configuration hbaseJobConf = new Configuration();
+        hbaseJobConf.addResource(confInput);
+        hbaseJobConf.set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3
+        Job job = new Job(hbaseJobConf, cubeSegment.getStorageLocationIdentifier());
 
         FileOutputFormat.setOutputPath(job, new Path(outputPath));