You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/08/20 10:48:07 UTC
[kylin] 01/03: KYLIN-3446 Connect to HBase out of Spark
Signed-off-by: shaofengshi
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 76c9c960be542c919301c72b34c7ae5ce6f1ec1c
Author: Yichen Zhou <zh...@gmail.com>
AuthorDate: Wed Aug 8 09:53:29 2018 +0800
KYLIN-3446 Connect to HBase out of Spark
Signed-off-by: shaofengshi <sh...@apache.org>
---
.../apache/kylin/engine/mr/JobBuilderSupport.java | 4 ++
.../kylin/engine/mr/common/AbstractHadoopJob.java | 3 +-
.../kylin/engine/mr/common/BatchConstants.java | 2 +-
.../kylin/storage/hbase/steps/CreateHTableJob.java | 68 ++++++++++++++++++----
.../kylin/storage/hbase/steps/HBaseJobSteps.java | 28 ++++++---
.../kylin/storage/hbase/steps/HBaseSparkSteps.java | 3 +-
.../kylin/storage/hbase/steps/SparkCubeHFile.java | 29 +++++----
7 files changed, 97 insertions(+), 40 deletions(-)
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 649b4c3..c6abf16 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -311,6 +311,10 @@ public class JobBuilderSupport {
return getOptimizationRootPath(jobId) + "/cuboid/";
}
+ public String getHBaseConfFilePath(String jobId) {
+ return getJobWorkingDir(jobId) + "/hbase-conf.xml";
+ }
+
// ============================================================================
// static methods also shared by other job flow participant
// ----------------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 329dd56..2976080 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -129,7 +129,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
.create(BatchConstants.ARG_LOOKUP_SNAPSHOT_ID);
protected static final Option OPTION_META_URL = OptionBuilder.withArgName(BatchConstants.ARG_META_URL)
.hasArg().isRequired(true).withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
-
+ public static final Option OPTION_HBASE_CONF_PATH = OptionBuilder.withArgName(BatchConstants.ARG_HBASE_CONF_PATH).hasArg()
+ .isRequired(true).withDescription("HBase config file path").create(BatchConstants.ARG_HBASE_CONF_PATH);
private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index a4a52ad..6fe55e2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -104,7 +104,7 @@ public interface BatchConstants {
String ARG_LOOKUP_SNAPSHOT_ID = "snapshotID";
String ARG_EXT_LOOKUP_SNAPSHOTS_INFO = "extlookupsnapshots";
String ARG_META_URL = "metadataUrl";
-
+ String ARG_HBASE_CONF_PATH = "hbaseConfPath";
/**
* logger and counter
*/
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 68aa172..5e17a4c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -26,12 +26,17 @@ import java.util.Map;
import java.util.Set;
import org.apache.commons.cli.Options;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
@@ -46,6 +51,7 @@ import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.engine.mr.common.CuboidShardUtil;
+import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +69,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
CubeDesc cubeDesc = null;
String segmentID = null;
String cuboidModeName = null;
+ String hbaseConfPath = null;
KylinConfig kylinConfig;
Path partitionFilePath;
@@ -74,6 +81,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
options.addOption(OPTION_SEGMENT_ID);
options.addOption(OPTION_PARTITION_FILE_PATH);
options.addOption(OPTION_CUBOID_MODE);
+ options.addOption(OPTION_HBASE_CONF_PATH);
parseOptions(options, args);
partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH));
@@ -85,11 +93,12 @@ public class CreateHTableJob extends AbstractHadoopJob {
kylinConfig = cube.getConfig();
segmentID = getOptionValue(OPTION_SEGMENT_ID);
cuboidModeName = getOptionValue(OPTION_CUBOID_MODE);
+ hbaseConfPath = getOptionValue(OPTION_HBASE_CONF_PATH);
CubeSegment cubeSegment = cube.getSegmentById(segmentID);
byte[][] splitKeys;
Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment, kylinConfig).getCuboidSizeMap();
-
+
// for cube planner, will keep cuboidSizeMap unchanged if cube planner is disabled
Set<Long> buildingCuboids = cube.getCuboidsByMode(cuboidModeName);
if (buildingCuboids != null && !buildingCuboids.isEmpty()) {
@@ -104,14 +113,35 @@ public class CreateHTableJob extends AbstractHadoopJob {
}
cuboidSizeMap = optimizedCuboidSizeMap;
}
-
+
splitKeys = getRegionSplitsFromCuboidStatistics(cuboidSizeMap, kylinConfig, cubeSegment,
partitionFilePath.getParent());
CubeHTableUtil.createHTable(cubeSegment, splitKeys);
+ exportHBaseConfiguration(cubeSegment.getStorageLocationIdentifier());
return 0;
}
+ private void exportHBaseConfiguration(String hbaseTableName) throws Exception {
+
+ Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
+ HadoopUtil.healSickConfig(hbaseConf);
+ Job job = Job.getInstance(hbaseConf, hbaseTableName);
+ HTable table = new HTable(hbaseConf, hbaseTableName);
+ HFileOutputFormat2.configureIncrementalLoadMap(job, table);
+
+ logger.info("Saving HBase configuration to " + hbaseConfPath);
+ FileSystem fs = HadoopUtil.getWorkingFileSystem();
+ FSDataOutputStream out = null;
+ try {
+ out = fs.create(new Path(hbaseConfPath));
+ job.getConfiguration().writeXml(out);
+ } catch (IOException e) {
+ throw new ExecuteException("Write hbase configuration failed", e);
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
+ }
//one region for one shard
private static byte[][] getSplitsByRegionCount(int regionCount) {
@@ -124,7 +154,9 @@ public class CreateHTableJob extends AbstractHadoopJob {
return result;
}
- public static byte[][] getRegionSplitsFromCuboidStatistics(final Map<Long, Double> cubeSizeMap, final KylinConfig kylinConfig, final CubeSegment cubeSegment, final Path hfileSplitsOutputFolder) throws IOException {
+ public static byte[][] getRegionSplitsFromCuboidStatistics(final Map<Long, Double> cubeSizeMap,
+ final KylinConfig kylinConfig, final CubeSegment cubeSegment, final Path hfileSplitsOutputFolder)
+ throws IOException {
final CubeDesc cubeDesc = cubeSegment.getCubeDesc();
float cut = cubeDesc.getConfig().getKylinHBaseRegionCut();
@@ -157,7 +189,8 @@ public class CreateHTableJob extends AbstractHadoopJob {
}
if (nRegion != original) {
- logger.info("Region count is adjusted from " + original + " to " + nRegion + " to help random sharding");
+ logger.info(
+ "Region count is adjusted from " + original + " to " + nRegion + " to help random sharding");
}
}
@@ -188,10 +221,13 @@ public class CreateHTableJob extends AbstractHadoopJob {
}
if (shardNum > nRegion) {
- logger.info(String.format("Cuboid %d 's estimated size %.2f MB will generate %d regions, reduce to %d", cuboidId, estimatedSize, shardNum, nRegion));
+ logger.info(
+ String.format("Cuboid %d 's estimated size %.2f MB will generate %d regions, reduce to %d",
+ cuboidId, estimatedSize, shardNum, nRegion));
shardNum = nRegion;
} else {
- logger.info(String.format("Cuboid %d 's estimated size %.2f MB will generate %d regions", cuboidId, estimatedSize, shardNum));
+ logger.info(String.format("Cuboid %d 's estimated size %.2f MB will generate %d regions", cuboidId,
+ estimatedSize, shardNum));
}
cuboidShards.put(cuboidId, (short) shardNum);
@@ -204,7 +240,8 @@ public class CreateHTableJob extends AbstractHadoopJob {
}
for (int i = 0; i < nRegion; ++i) {
- logger.info(String.format("Region %d's estimated size is %.2f MB, accounting for %.2f percent", i, regionSizes[i], 100.0 * regionSizes[i] / totalSizeInM));
+ logger.info(String.format("Region %d's estimated size is %.2f MB, accounting for %.2f percent", i,
+ regionSizes[i], 100.0 * regionSizes[i] / totalSizeInM));
}
CuboidShardUtil.saveCuboidShards(cubeSegment, cuboidShards, nRegion);
@@ -222,7 +259,8 @@ public class CreateHTableJob extends AbstractHadoopJob {
if (size >= mbPerRegion || (size + cubeSizeMap.get(cuboidId)) >= mbPerRegion * 1.2) {
// if the size already bigger than threshold, or it will exceed by 20%, cut for next region
regionSplit.add(cuboidId);
- logger.info("Region " + regionIndex + " will be " + size + " MB, contains cuboids < " + cuboidId + " (" + cuboidCount + ") cuboids");
+ logger.info("Region " + regionIndex + " will be " + size + " MB, contains cuboids < " + cuboidId
+ + " (" + cuboidCount + ") cuboids");
size = 0;
cuboidCount = 0;
regionIndex++;
@@ -240,7 +278,8 @@ public class CreateHTableJob extends AbstractHadoopJob {
}
}
- protected static void saveHFileSplits(final List<HashMap<Long, Double>> innerRegionSplits, int mbPerRegion, final Path outputFolder, final KylinConfig kylinConfig) throws IOException {
+ protected static void saveHFileSplits(final List<HashMap<Long, Double>> innerRegionSplits, int mbPerRegion,
+ final Path outputFolder, final KylinConfig kylinConfig) throws IOException {
if (outputFolder == null) {
logger.warn("outputFolder for hfile split file is null, skip inner region split");
@@ -300,7 +339,8 @@ public class CreateHTableJob extends AbstractHadoopJob {
logger.info(String.format("Region %d's hfile %d size is %.2f mb", i, j, accumulatedSize));
byte[] split = new byte[RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN];
BytesUtil.writeUnsigned(i, split, 0, RowConstants.ROWKEY_SHARDID_LEN);
- System.arraycopy(Bytes.toBytes(cuboid), 0, split, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
+ System.arraycopy(Bytes.toBytes(cuboid), 0, split, RowConstants.ROWKEY_SHARDID_LEN,
+ RowConstants.ROWKEY_CUBOIDID_LEN);
splits.add(split);
accumulatedSize = 0;
j++;
@@ -310,11 +350,15 @@ public class CreateHTableJob extends AbstractHadoopJob {
}
- SequenceFile.Writer hfilePartitionWriter = SequenceFile.createWriter(hbaseConf, SequenceFile.Writer.file(hfilePartitionFile), SequenceFile.Writer.keyClass(RowKeyWritable.class), SequenceFile.Writer.valueClass(NullWritable.class));
+ SequenceFile.Writer hfilePartitionWriter = SequenceFile.createWriter(hbaseConf,
+ SequenceFile.Writer.file(hfilePartitionFile), SequenceFile.Writer.keyClass(RowKeyWritable.class),
+ SequenceFile.Writer.valueClass(NullWritable.class));
for (int i = 0; i < splits.size(); i++) {
//when we compare the rowkey, we compare the row firstly.
- hfilePartitionWriter.append(new RowKeyWritable(KeyValue.createFirstOnRow(splits.get(i)).createKeyOnly(false).getKey()), NullWritable.get());
+ hfilePartitionWriter.append(
+ new RowKeyWritable(KeyValue.createFirstOnRow(splits.get(i)).createKeyOnly(false).getKey()),
+ NullWritable.get());
}
hfilePartitionWriter.close();
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
index 4fda139..e48090d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
@@ -59,8 +59,10 @@ public abstract class HBaseJobSteps extends JobBuilderSupport {
StringBuilder cmd = new StringBuilder();
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
- appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
+ appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION,
+ getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_HBASE_CONF_PATH, getHBaseConfFilePath(jobId));
createHtableStep.setJobParams(cmd.toString());
createHtableStep.setJobClass(CreateHTableJob.class);
@@ -69,7 +71,8 @@ public abstract class HBaseJobSteps extends JobBuilderSupport {
}
// TODO make it abstract
- public MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegment> mergingSegments, String jobID, Class<? extends AbstractHadoopJob> clazz) {
+ public MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegment> mergingSegments,
+ String jobID, Class<? extends AbstractHadoopJob> clazz) {
final List<String> mergingCuboidPaths = Lists.newArrayList();
for (CubeSegment merging : mergingSegments) {
mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
@@ -86,7 +89,8 @@ public abstract class HBaseJobSteps extends JobBuilderSupport {
appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, formattedPath);
appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
- appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+ appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+ "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
mergeCuboidDataStep.setMapReduceParams(cmd.toString());
mergeCuboidDataStep.setMapReduceJobClass(clazz);
@@ -148,8 +152,10 @@ public abstract class HBaseJobSteps extends JobBuilderSupport {
}
public List<String> getMergingHTables() {
- final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment) seg);
- Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge, target segment " + seg);
+ final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization())
+ .getMergingSegments((CubeSegment) seg);
+ Preconditions.checkState(mergingSegments.size() > 1,
+ "there should be more than 2 segments to merge, target segment " + seg);
final List<String> mergingHTables = Lists.newArrayList();
for (CubeSegment merging : mergingSegments) {
mergingHTables.add(merging.getStorageLocationIdentifier());
@@ -158,8 +164,10 @@ public abstract class HBaseJobSteps extends JobBuilderSupport {
}
public List<String> getMergingHDFSPaths() {
- final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment) seg);
- Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge, target segment " + seg);
+ final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization())
+ .getMergingSegments((CubeSegment) seg);
+ Preconditions.checkState(mergingSegments.size() > 1,
+ "there should be more than 2 segments to merge, target segment " + seg);
final List<String> mergingHDFSPaths = Lists.newArrayList();
for (CubeSegment merging : mergingSegments) {
mergingHDFSPaths.add(getJobWorkingDir(merging.getLastBuildJobID()));
@@ -180,11 +188,13 @@ public abstract class HBaseJobSteps extends JobBuilderSupport {
}
public String getHFilePath(String jobId) {
- return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/hfile/");
+ return HBaseConnection.makeQualifiedPathInHBaseCluster(
+ getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/hfile/");
}
public String getRowkeyDistributionOutputPath(String jobId) {
- return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/rowkey_stats");
+ return HBaseConnection.makeQualifiedPathInHBaseCluster(
+ getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/rowkey_stats");
}
public void addOptimizeGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
index 622a0e8..be230f0 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
@@ -22,6 +22,7 @@ import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.spark.SparkBatchCubingJobBuilder2;
import org.apache.kylin.engine.spark.SparkExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
@@ -48,7 +49,7 @@ public class HBaseSparkSteps extends HBaseJobSteps {
sparkExecutable.setParam(SparkCubeHFile.OPTION_OUTPUT_PATH.getOpt(), getHFilePath(jobId));
sparkExecutable.setParam(SparkCubeHFile.OPTION_PARTITION_FILE_PATH.getOpt(),
getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");
-
+ sparkExecutable.setParam(AbstractHadoopJob.OPTION_HBASE_CONF_PATH.getOpt(), getHBaseConfFilePath(jobId));
sparkExecutable.setJobId(jobId);
StringBuilder jars = new StringBuilder();
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index fd8459f..c87a739 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -17,7 +17,6 @@
*/
package org.apache.kylin.storage.hbase.steps;
-import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -30,12 +29,11 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
@@ -58,7 +56,6 @@ import org.apache.kylin.engine.mr.common.SerializableConfiguration;
import org.apache.kylin.engine.spark.KylinSparkJobListener;
import org.apache.kylin.engine.spark.SparkUtil;
import org.apache.kylin.measure.MeasureCodec;
-import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
@@ -102,6 +99,7 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
options.addOption(OPTION_META_URL);
options.addOption(OPTION_OUTPUT_PATH);
options.addOption(OPTION_PARTITION_FILE_PATH);
+ options.addOption(AbstractHadoopJob.OPTION_HBASE_CONF_PATH);
}
@Override
@@ -117,11 +115,12 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
final Path partitionFilePath = new Path(optionsHelper.getOptionValue(OPTION_PARTITION_FILE_PATH));
+ final String hbaseConfFile = optionsHelper.getOptionValue(AbstractHadoopJob.OPTION_HBASE_CONF_PATH);
Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"), KeyValueCreator.class,
KeyValue.class, RowKeyWritable.class };
- SparkConf conf = new SparkConf().setAppName("Covnerting Hfile for:" + cubeName + " segment " + segmentId);
+ SparkConf conf = new SparkConf().setAppName("Converting HFile for:" + cubeName + " segment " + segmentId);
//serialization conf
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
@@ -171,17 +170,15 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
}
logger.info("There are " + keys.size() + " split keys, totally " + (keys.size() + 1) + " hfiles");
- Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
- HadoopUtil.healSickConfig(hbaseConf);
- Job job = new Job(hbaseConf, cubeSegment.getStorageLocationIdentifier());
- job.getConfiguration().set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3
- HTable table = new HTable(hbaseConf, cubeSegment.getStorageLocationIdentifier());
- try {
- HFileOutputFormat2.configureIncrementalLoadMap(job, table);
- } catch (IOException ioe) {
- // this can be ignored.
- logger.debug(ioe.getMessage(), ioe);
- }
+
+ //HBase conf
+ logger.info("Loading HBase configuration from:" + hbaseConfFile);
+ FSDataInputStream confInput = fs.open(new Path(hbaseConfFile));
+
+ Configuration hbaseJobConf = new Configuration();
+ hbaseJobConf.addResource(confInput);
+ hbaseJobConf.set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3
+ Job job = new Job(hbaseJobConf, cubeSegment.getStorageLocationIdentifier());
FileOutputFormat.setOutputPath(job, new Path(outputPath));