You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/06/18 06:13:22 UTC
[kylin] branch master updated: [KYLIN-4947] Implement spark engine
for cube optimization jobs.
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new efedf07 [KYLIN-4947] Implement spark engine for cube optimization jobs.
efedf07 is described below
commit efedf07043c704cbdfa04ac3af3d73cec866cb76
Author: yangjiang <ya...@ebay.com>
AuthorDate: Wed Apr 7 15:39:04 2021 +0800
[KYLIN-4947] Implement spark engine for cube optimization jobs.
---
.../org/apache/kylin/common/KylinConfigBase.java | 8 +
.../apache/kylin/cube/cuboid/CuboidScheduler.java | 3 +-
.../org/apache/kylin/cube/cuboid/CuboidUtil.java | 1 +
.../kylin/cube/cuboid/TreeCuboidScheduler.java | 7 +-
.../org/apache/kylin/cube/kv/RowKeyDecoder.java | 3 +-
.../kylin/job/constant/ExecutableConstants.java | 6 +
.../kylin/engine/mr/BatchOptimizeJobBuilder2.java | 1 +
.../kylin/engine/mr/common/BatchConstants.java | 5 +-
.../kylin/engine/mr/common/CubeStatsWriter.java | 6 +
.../kylin/engine/spark/KylinKryoRegistrator.java | 5 +
.../engine/spark/SparkBatchCubingEngine2.java | 7 +-
.../spark/SparkBatchOptimizeJobBuilder2.java | 211 ++++++++++++
.../kylin/engine/spark/SparkBuildDictionary.java | 4 +-
.../SparkCalculateStatsFromBaseCuboidJob.java | 354 +++++++++++++++++++++
.../kylin/engine/spark/SparkCubingByLayer.java | 10 +-
.../engine/spark/SparkCubingByLayerForOpt.java | 269 ++++++++++++++++
.../kylin/engine/spark/SparkCubingMerge.java | 4 +-
.../apache/kylin/engine/spark/SparkExecutable.java | 10 +-
.../kylin/engine/spark/SparkFactDistinct.java | 4 +-
.../spark/SparkFilterRecommendCuboidDataJob.java | 168 ++++++++++
.../apache/kylin/engine/spark/SparkFunction.java | 104 ++++++
.../kylin/engine/spark/SparkMergingDictionary.java | 4 +-
.../kylin/engine/spark/SparkUHCDictionary.java | 4 +-
.../SparkUpdateShardForOldCuboidDataStep.java | 223 +++++++++++++
.../org/apache/kylin/engine/spark/SparkUtil.java | 99 +++++-
.../kylin/storage/hbase/steps/BulkLoadJob.java | 2 +-
.../kylin/storage/hbase/steps/CreateHTableJob.java | 2 +
.../hbase/steps/HBaseSparkOutputTransition.java | 26 +-
.../kylin/storage/hbase/steps/SparkCubeHFile.java | 4 +-
29 files changed, 1518 insertions(+), 36 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 8d22fa3..767ae8b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1731,6 +1731,14 @@ public abstract class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(getOptional("kylin.engine.spark-create-table-enabled", FALSE));
}
+ public boolean isSparkOptimizeCubeViaSparkEnable() {
+ return Boolean.parseBoolean(getOptional("kylin.engine.spark-optimize-cube-enabled", TRUE));
+ }
+
+ public boolean isUseSparkCalculateStatsEnable() {
+ return Boolean.parseBoolean(getOptional("kylin.engine.spark-calculate-stats-enabled", TRUE));
+ }
+
public boolean isFlinkSanityCheckEnabled() {
return Boolean.parseBoolean(getOptional("kylin.engine.flink.sanity-check-enabled", FALSE));
}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
index 5c57fad..8ac19fd 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
@@ -18,6 +18,7 @@
package org.apache.kylin.cube.cuboid;
+import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -32,7 +33,7 @@ import org.apache.kylin.shaded.com.google.common.collect.Lists;
/**
* Defines a cuboid tree, rooted by the base cuboid. A parent cuboid generates its child cuboids.
*/
-abstract public class CuboidScheduler {
+abstract public class CuboidScheduler implements Serializable {
public static CuboidScheduler getInstance(CubeDesc cubeDesc) {
String clzName = cubeDesc.getConfig().getCuboidScheduler();
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java
index aae3129..e40ea5f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java
@@ -32,6 +32,7 @@ import org.apache.kylin.shaded.com.google.common.collect.Maps;
public class CuboidUtil {
+ // get the i cuboid the j '1' `s index
public static Integer[][] getCuboidBitSet(Long[] cuboidIds, int nRowKey) {
Preconditions.checkArgument(nRowKey < Long.SIZE,
"the size of row key could not be large than " + (Long.SIZE - 1));
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidScheduler.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidScheduler.java
index 40242c4..b18e38b 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidScheduler.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidScheduler.java
@@ -19,6 +19,7 @@
package org.apache.kylin.cube.cuboid;
import java.io.PrintWriter;
+import java.io.Serializable;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -71,7 +72,7 @@ public class TreeCuboidScheduler extends CuboidScheduler {
return cuboidTree.isValid(requestCuboid);
}
- public static class CuboidTree {
+ public static class CuboidTree implements Serializable {
private int treeLevels;
private TreeNode root;
@@ -232,7 +233,7 @@ public class TreeCuboidScheduler extends CuboidScheduler {
}
}
- public static class TreeNode {
+ public static class TreeNode implements Serializable {
@JsonProperty("cuboid_id")
long cuboidId;
@JsonIgnore
@@ -290,7 +291,7 @@ public class TreeCuboidScheduler extends CuboidScheduler {
/**
* Compare cuboid according to the cuboid data row count
*/
- public static class CuboidCostComparator implements Comparator<Long> {
+ public static class CuboidCostComparator implements Comparator<Long>, Serializable {
private Map<Long, Long> cuboidStatistics;
public CuboidCostComparator(Map<Long, Long> cuboidStatistics) {
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
index 71ad4bf..516ce7f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
@@ -19,6 +19,7 @@
package org.apache.kylin.cube.kv;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -34,7 +35,7 @@ import org.apache.kylin.metadata.model.TblColRef;
* @author xjiang
*
*/
-public class RowKeyDecoder {
+public class RowKeyDecoder implements Serializable {
private final CubeDesc cubeDesc;
private final RowKeyColumnIO colIO;
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 4deab99..aa9e875 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -46,12 +46,17 @@ public final class ExecutableConstants {
public static final String STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE = "Sqoop To Flat Hive Table";
public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables";
public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
+ public static final String STEP_NAME_FACT_DISTINCT_COLUMNS_SPARK = "Extract Fact Table Distinct Columns With Spark";
public static final String STEP_NAME_CALCULATE_STATS_FROM_BASE_CUBOID = "Calculate Statistics from Base Cuboid";
public static final String STEP_NAME_FILTER_RECOMMEND_CUBOID_DATA_FOR_OPTIMIZATION = "Filter Recommend Cuboid Data for Optimization";
+ public static final String STEP_NAME_FILTER_RECOMMEND_CUBOID_DATA_FOR_OPTIMIZATION_SPARK = "Filter Recommend Cuboid Data for Optimization with Spark";
+ public static final String STEP_NAME_CALCULATE_STATS_FROM_BASECUBOID_SPARK = "Calculate Stats From BaseCuboid Step with Spark";
public static final String STEP_NAME_UPDATE_OLD_CUBOID_SHARD = "Update Old Cuboid Shard for Optimization";
+ public static final String STEP_NAME_UPDATE_OLD_CUBOID_SHARD_SPARK = "Update Old Cuboid Shard for Optimization With Spark";
public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid";
public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube In-Mem";
public static final String STEP_NAME_BUILD_SPARK_CUBE = "Build Cube with Spark";
+ public static final String STEP_NAME_OPTIMIZE_SPARK_CUBE = "Optimize Cube with Spark";
public static final String STEP_NAME_BUILD_FLINK_CUBE = "Build Cube with Flink";
public static final String STEP_NAME_BUILD_N_D_CUBOID = "Build N-Dimension Cuboid";
public static final String STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION = "Calculate HTable Region Splits";
@@ -64,6 +69,7 @@ public final class ExecutableConstants {
public static final String STEP_NAME_MERGE_STATISTICS = "Merge Cuboid Statistics";
public static final String STEP_NAME_MERGE_UPDATE_DICTIONARY = "Update Dictionary Data";
public static final String STEP_NAME_MERGE_STATISTICS_WITH_OLD = "Merge Cuboid Statistics with Old for Optimization";
+ public static final String STEP_NAME_MERGE_STATISTICS_WITH_SPARK = "Merge Cuboid Statistics with Spark for Optimization";
public static final String STEP_NAME_SAVE_STATISTICS = "Save Cuboid Statistics";
public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java
index 9e8b9e8..18a9ded 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java
@@ -64,6 +64,7 @@ public class BatchOptimizeJobBuilder2 extends JobBuilderSupport {
// Phase 1: Prepare base cuboid data from old segment
String oldcuboidRootPath = getCuboidRootPath(oldSegment) + "*";
+ // write to optimizeCuboidRootPath + /base_cuboid OR +/old
result.addTask(createFilterRecommendCuboidDataStep(oldcuboidRootPath, optimizeCuboidRootPath));
// Phase 2: Prepare dictionary and statistics for new segment
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index d0e2936..dd2dc6e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -94,7 +94,7 @@ public interface BatchConstants {
String ARG_CUBE_NAME = "cubename";
String ARG_II_NAME = "iiname";
String ARG_SEGMENT_NAME = "segmentname";
- String ARG_SEGMENT_ID = "segmentid";
+ String ARG_SEGMENT_ID = "segmentId";
String ARG_PARTITION = "partitions";
String ARG_STATS_ENABLED = "statisticsenabled";
String ARG_STATS_OUTPUT = "statisticsoutput";
@@ -107,7 +107,7 @@ public interface BatchConstants {
String ARG_TABLE_NAME = "tableName";
String ARG_LOOKUP_SNAPSHOT_ID = "snapshotID";
String ARG_EXT_LOOKUP_SNAPSHOTS_INFO = "extlookupsnapshots";
- String ARG_META_URL = "metadataUrl";
+ String ARG_META_URL = "metaUrl";
String ARG_HBASE_CONF_PATH = "hbaseConfPath";
String ARG_SHRUNKEN_DICT_PATH = "shrunkenDictPath";
String ARG_COUNTER_OUTPUT = "counterOutput";
@@ -116,6 +116,7 @@ public interface BatchConstants {
String ARG_BASE64_ENCODED_SQL = "base64EncodedSql";
String ARG_GLOBAL_DIC_PART_REDUCE_STATS = "global_dict_part_reduce_stats";
String ARG_GLOBAL_DIC_MAX_DISTINCT_COUNT = "global_dict_max_distinct_count";
+ String ARG_HIVE_TABLE = "hiveTable";
/**
* logger and counter
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
index 3c41e1f..4348bdf 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
@@ -35,9 +35,13 @@ import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.hllc.HLLCounter;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class CubeStatsWriter {
+ protected static final Logger logger = LoggerFactory.getLogger(CubeStatsWriter.class);
+
public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage) throws IOException {
writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0, 0);
@@ -62,6 +66,8 @@ public class CubeStatsWriter {
Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio,
int shard) throws IOException {
Path seqFilePath = new Path(outputPath, BatchConstants.CFG_OUTPUT_STATISTICS + "_" + shard);
+ logger.info("writePartialCuboidStatistics for cuboid: " + cuboidHLLMap.keySet().toString());
+ logger.info("writePartialCuboidStatistics Path: " + seqFilePath);
writeCuboidStatisticsInner(conf, seqFilePath, cuboidHLLMap, samplingPercentage, mapperNumber,
mapperOverlapRatio, 0);
}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
index a50c11d..5cf1e2f 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
@@ -86,6 +86,8 @@ public class KylinKryoRegistrator implements KryoRegistrator {
kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnFamilyDesc[].class);
kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnDesc[].class);
kyroClasses.add(org.apache.kylin.cube.model.RowKeyColDesc[].class);
+ kyroClasses.add(org.apache.kylin.cube.cuboid.TreeCuboidScheduler.TreeNode.class);
+ kyroClasses.add(org.apache.kylin.cube.cuboid.TreeCuboidScheduler.CuboidCostComparator.class);
kylinClassByReflection1(kyroClasses);
kylinClassByReflection2(kyroClasses);
@@ -146,6 +148,8 @@ public class KylinKryoRegistrator implements KryoRegistrator {
kyroClasses.add(org.apache.kylin.cube.common.RowKeySplitter.class);
kyroClasses.add(org.apache.kylin.cube.cuboid.Cuboid.class);
kyroClasses.add(org.apache.kylin.cube.cuboid.DefaultCuboidScheduler.class);
+ kyroClasses.add(org.apache.kylin.cube.cuboid.TreeCuboidScheduler.class);
+ kyroClasses.add(org.apache.kylin.cube.cuboid.TreeCuboidScheduler.CuboidTree.class);
kyroClasses.add(org.apache.kylin.cube.gridtable.TrimmedDimensionSerializer.class);
kyroClasses.add(org.apache.kylin.cube.kv.AbstractRowKeyEncoder.class);
kyroClasses.add(org.apache.kylin.cube.kv.CubeDimEncMap.class);
@@ -155,6 +159,7 @@ public class KylinKryoRegistrator implements KryoRegistrator {
kyroClasses.add(org.apache.kylin.cube.kv.RowKeyColumnIO.class);
kyroClasses.add(org.apache.kylin.cube.kv.RowKeyEncoder.class);
kyroClasses.add(org.apache.kylin.cube.kv.RowKeyEncoderProvider.class);
+ kyroClasses.add(org.apache.kylin.cube.kv.RowKeyDecoder.class);
kyroClasses.add(org.apache.kylin.cube.model.AggregationGroup.class);
kyroClasses.add(org.apache.kylin.cube.model.AggregationGroup.HierarchyMask.class);
kyroClasses.add(org.apache.kylin.cube.model.CubeDesc.class);
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
index d3afb03..24f3f91 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
@@ -51,8 +51,11 @@ public class SparkBatchCubingEngine2 implements IBatchCubingEngine {
@Override
public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter) {
- //TODO use Spark to optimize
- return new BatchOptimizeJobBuilder2(optimizeSegment, submitter).build();
+ if (optimizeSegment.getConfig().isSparkOptimizeCubeViaSparkEnable()) {
+ return new SparkBatchOptimizeJobBuilder2(optimizeSegment, submitter).build();
+ } else {
+ return new BatchOptimizeJobBuilder2(optimizeSegment, submitter).build();
+ }
}
@Override
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchOptimizeJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchOptimizeJobBuilder2.java
new file mode 100644
index 0000000..2971857
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchOptimizeJobBuilder2.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.steps.CopyDictionaryStep;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.engine.mr.steps.MergeStatisticsWithOldStep;
+import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterOptimizeStep;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.shaded.com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SparkBatchOptimizeJobBuilder2 extends JobBuilderSupport {
+ private static final Logger logger = LoggerFactory.getLogger(SparkBatchOptimizeJobBuilder2.class);
+
+ private final ISparkOutput.ISparkBatchOptimizeOutputSide outputSide;
+
+ public SparkBatchOptimizeJobBuilder2(CubeSegment optimizeSegment, String submitter) {
+ super(optimizeSegment, submitter);
+ this.outputSide = SparkUtil.getBatchOptimizeOutputSide2(seg);
+ }
+
+ public CubingJob build() {
+ logger.info("Spark new job to Optimize segment " + seg);
+ final CubingJob result = CubingJob.createOptimizeJob(seg, submitter, config);
+ final String jobId = result.getId();
+ final String cuboidRootPath = getCuboidRootPath(jobId);
+
+ final String optimizeCuboidRootPath = getOptimizationCuboidPath(jobId);
+
+ CubeSegment oldSegment = seg.getCubeInstance().getOriginalSegmentToOptimize(seg);
+ Preconditions.checkNotNull(oldSegment, "cannot find the original segment to be optimized by " + seg);
+
+ // Phase 1: Prepare base cuboid data from old segment
+ String oldcuboidRootPath = getCuboidRootPath(oldSegment) + "*";
+ // Filter cuboid to optimizeCuboidRootPath + /base_cuboid and +/old
+ result.addTask(createFilterRecommendCuboidDataStep(oldcuboidRootPath, optimizeCuboidRootPath, jobId));
+
+ // Phase 2: Prepare dictionary and statistics for new segment
+ result.addTask(createCopyDictionaryStep());
+ String optStatsSourcePath = getBaseCuboidPath(optimizeCuboidRootPath);
+ String optStatsDstPath = getOptimizationStatisticsPath(jobId);
+ // Calculate statistic
+ if (seg.getConfig().isUseSparkCalculateStatsEnable()) {
+ result.addTask(createCalculateStatsFromBaseCuboidStepWithSpark(optStatsSourcePath, optStatsDstPath,
+ CuboidModeEnum.RECOMMEND_MISSING, jobId));
+ } else {
+ result.addTask(createCalculateStatsFromBaseCuboid(optStatsSourcePath, optStatsDstPath,
+ CuboidModeEnum.RECOMMEND_MISSING));
+ }
+
+ result.addTask(createMergeStatisticsWithOldStep(jobId, optStatsDstPath, getStatisticsPath(jobId)));
+ outputSide.addStepPhase2_CreateHTable(result);
+
+ result.addTask(createUpdateShardForOldCuboidDataStep(optimizeCuboidRootPath, cuboidRootPath, jobId));
+
+ // Phase 3: Build Cube for Missing Cuboid Data
+ addLayerCubingSteps(result, jobId, CuboidModeEnum.RECOMMEND_MISSING_WITH_BASE,
+ SparkUtil.generateFilePath(PathNameCuboidBase, cuboidRootPath), cuboidRootPath); // layer cubing
+
+ outputSide.addStepPhase3_BuildCube(result);
+
+ // Phase 4: Update Metadata & Cleanup
+ result.addTask(createUpdateCubeInfoAfterOptimizeStep(jobId));
+ outputSide.addStepPhase4_Cleanup(result);
+
+ return result;
+ }
+
+ private SparkExecutable createCalculateStatsFromBaseCuboidStepWithSpark(String inputPath, String outputPath,
+ CuboidModeEnum recommendMissing, String jobId) {
+ final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig());
+ sparkExecutable.setName(ExecutableConstants.STEP_NAME_CALCULATE_STATS_FROM_BASECUBOID_SPARK);
+
+ sparkExecutable.setParam(SparkCalculateStatsFromBaseCuboidJob.OPTION_CUBE_NAME.getOpt(),
+ seg.getRealization().getName());
+ sparkExecutable.setParam(SparkCalculateStatsFromBaseCuboidJob.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+ sparkExecutable.setParam(SparkCalculateStatsFromBaseCuboidJob.OPTION_INPUT_PATH.getOpt(), inputPath);
+ sparkExecutable.setParam(SparkCalculateStatsFromBaseCuboidJob.OPTION_OUTPUT_PATH.getOpt(), outputPath);
+ sparkExecutable.setParam(SparkCalculateStatsFromBaseCuboidJob.OPTION_META_URL.getOpt(),
+ getSegmentMetadataUrl(seg.getConfig(), jobId));
+ sparkExecutable.setParam(SparkCalculateStatsFromBaseCuboidJob.OPTION_JOB_MODE.getOpt(),
+ recommendMissing.toString());
+ sparkExecutable.setParam(SparkCalculateStatsFromBaseCuboidJob.OPTION_SAMPLING_PERCENT.getOpt(),
+ String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
+ //need JobId in sparkExecutable
+ sparkExecutable.setJobId(jobId);
+
+ sparkExecutable.setClassName(SparkCalculateStatsFromBaseCuboidJob.class.getName());
+
+ return sparkExecutable;
+ }
+
+ private SparkExecutable createFilterRecommendCuboidDataStep(String inputPath, String outputPath , String jobId) {
+ final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig());
+ sparkExecutable.setName(ExecutableConstants.STEP_NAME_FILTER_RECOMMEND_CUBOID_DATA_FOR_OPTIMIZATION_SPARK);
+
+ sparkExecutable.setParam(SparkFilterRecommendCuboidDataJob.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+ sparkExecutable.setParam(SparkFilterRecommendCuboidDataJob.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+ sparkExecutable.setParam(SparkFilterRecommendCuboidDataJob.OPTION_INPUT_PATH.getOpt(), inputPath);
+ sparkExecutable.setParam(SparkFilterRecommendCuboidDataJob.OPTION_OUTPUT_PATH.getOpt(), outputPath);
+ sparkExecutable.setParam(SparkFilterRecommendCuboidDataJob.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), jobId));
+ sparkExecutable.setClassName(SparkFilterRecommendCuboidDataJob.class.getName());
+ //need JobId in sparkExecutable
+ sparkExecutable.setJobId(jobId);
+
+ return sparkExecutable;
+ }
+
+ private UpdateCubeInfoAfterOptimizeStep createUpdateCubeInfoAfterOptimizeStep(String jobId) {
+ final UpdateCubeInfoAfterOptimizeStep result = new UpdateCubeInfoAfterOptimizeStep();
+ result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
+
+ CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+ CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+ CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
+
+ return result;
+ }
+
+ private void addLayerCubingSteps(CubingJob result, String jobId, CuboidModeEnum mode, String input, String output) {
+ final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig());
+ sparkExecutable.setClassName(SparkCubingByLayerForOpt.class.getName());
+ configureSparkJob(seg, sparkExecutable, jobId, input, output, mode);
+ result.addTask(sparkExecutable);
+ }
+
+ private SparkExecutable createUpdateShardForOldCuboidDataStep(String inputPath, String outputPath, String jobId) {
+ final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig());
+ sparkExecutable.setName(ExecutableConstants.STEP_NAME_UPDATE_OLD_CUBOID_SHARD_SPARK);
+
+ sparkExecutable.setParam(SparkUpdateShardForOldCuboidDataStep.OPTION_CUBE_NAME.getOpt(),
+ seg.getRealization().getName());
+ sparkExecutable.setParam(SparkUpdateShardForOldCuboidDataStep.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+ sparkExecutable.setParam(SparkUpdateShardForOldCuboidDataStep.OPTION_INPUT_PATH.getOpt(), inputPath);
+ sparkExecutable.setParam(SparkUpdateShardForOldCuboidDataStep.OPTION_OUTPUT_PATH.getOpt(), outputPath);
+ sparkExecutable.setParam(SparkUpdateShardForOldCuboidDataStep.OPTION_META_URL.getOpt(),
+ getSegmentMetadataUrl(seg.getConfig(), jobId));
+ //need JobId in sparkExecutable
+ sparkExecutable.setJobId(jobId);
+ sparkExecutable.setClassName(SparkUpdateShardForOldCuboidDataStep.class.getName());
+
+ return sparkExecutable;
+ }
+
+ private MergeStatisticsWithOldStep createMergeStatisticsWithOldStep(String jobId, String optStatsPath,
+ String mergedStatisticsFolder) {
+ MergeStatisticsWithOldStep result = new MergeStatisticsWithOldStep();
+ result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS_WITH_OLD);
+
+ CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
+ CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+ CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+ CubingExecutableUtil.setStatisticsPath(optStatsPath, result.getParams());
+ CubingExecutableUtil.setMergedStatisticsPath(mergedStatisticsFolder, result.getParams());
+
+ return result;
+ }
+
+ private AbstractExecutable createCopyDictionaryStep() {
+ CopyDictionaryStep result = new CopyDictionaryStep();
+ result.setName(ExecutableConstants.STEP_NAME_COPY_DICTIONARY);
+
+ CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+ CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+ return result;
+ }
+
+ private void configureSparkJob(final CubeSegment seg, final SparkExecutable sparkExecutable, final String jobId,
+ final String input, String output, CuboidModeEnum mode) {
+
+ sparkExecutable.setParam(SparkCubingByLayerForOpt.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+ sparkExecutable.setParam(SparkCubingByLayerForOpt.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+ sparkExecutable.setParam(SparkCubingByLayerForOpt.OPTION_META_URL.getOpt(),
+ getSegmentMetadataUrl(seg.getConfig(), jobId));
+ sparkExecutable.setParam(SparkCubingByLayerForOpt.OPTION_OUTPUT_PATH.getOpt(), output);
+ sparkExecutable.setParam(SparkCubingByLayerForOpt.OPTION_INPUT_PATH.getOpt(), input);
+ sparkExecutable.setParam(SparkCubingByLayerForOpt.OPTION_CUBOID_MODE.getOpt(), mode.toString());
+ sparkExecutable.setJobId(jobId);
+
+ StringBuilder jars = new StringBuilder();
+
+ StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
+ sparkExecutable.setJars(jars.toString());
+ sparkExecutable.setName(ExecutableConstants.STEP_NAME_OPTIMIZE_SPARK_CUBE + ":" + seg.toString());
+ }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBuildDictionary.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBuildDictionary.java
index 25020eb..2d29e1a 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBuildDictionary.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBuildDictionary.java
@@ -93,14 +93,14 @@ public class SparkBuildDictionary extends AbstractApplication implements Seriali
public static final Option OPTION_DICT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_DICT_PATH).hasArg()
.isRequired(true).withDescription("Cube dictionary output path").create(BatchConstants.ARG_DICT_PATH);
public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
- .withDescription("Cube Segment Id").create("segmentId");
+ .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID);
public static final Option OPTION_CUBING_JOB_ID = OptionBuilder
.withArgName(BatchConstants.ARG_CUBING_JOB_ID).hasArg().isRequired(true)
.withDescription("Cubing job id").create(BatchConstants.ARG_CUBING_JOB_ID);
public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
.isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
- .withDescription("HDFS metadata url").create("metaUrl");
+ .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
public static final Option OPTION_COUNTER_PATH = OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUTPUT).hasArg()
.isRequired(true).withDescription("counter output path").create(BatchConstants.ARG_COUNTER_OUTPUT);
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCalculateStatsFromBaseCuboidJob.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCalculateStatsFromBaseCuboidJob.java
new file mode 100644
index 0000000..838efc0
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCalculateStatsFromBaseCuboidJob.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinVersion;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
+import org.apache.kylin.cube.kv.RowKeyDecoder;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.shaded.com.google.common.base.Preconditions;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.apache.kylin.shaded.com.google.common.hash.HashFunction;
+import org.apache.kylin.shaded.com.google.common.hash.Hasher;
+import org.apache.kylin.shaded.com.google.common.hash.Hashing;
+import org.apache.spark.SparkConf;
+import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import scala.Tuple2;
+
+public class SparkCalculateStatsFromBaseCuboidJob extends AbstractApplication implements Serializable {
+
+ protected static final Logger logger = LoggerFactory.getLogger(SparkCalculateStatsFromBaseCuboidJob.class);
+
+ protected Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
+
+ public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+ .isRequired(true).create(BatchConstants.ARG_CUBE_NAME);
+ public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID).hasArg().isRequired(true)
+ .create(BatchConstants.ARG_SEGMENT_ID);
+ public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+ .isRequired(true).create(BatchConstants.ARG_INPUT);
+ public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+ .isRequired(true).create(BatchConstants.ARG_OUTPUT);
+ public static final Option OPTION_META_URL = OptionBuilder.withArgName(BatchConstants.ARG_META_URL).hasArg().isRequired(true)
+ .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
+ public static final Option OPTION_JOB_MODE = OptionBuilder.withArgName(BatchConstants.ARG_CUBOID_MODE).hasArg().isRequired(true)
+ .create(BatchConstants.ARG_CUBOID_MODE);
+ public static final Option OPTION_SAMPLING_PERCENT = OptionBuilder.withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg()
+ .isRequired(true).create(BatchConstants.ARG_STATS_SAMPLING_PERCENT);
+
+ private Options options;
+
+ private int samplingPercent;
+ private int rowCount = 0;
+ private long[] rowHashCodesLong = null;
+ //about details of the new algorithm, please see KYLIN-2518
+ private boolean isUsePutRowKeyToHllNewAlgorithm;
+
+ private HLLCounter[] allCuboidsHLL = null;
+ private Long[] cuboidIds;
+ private Integer[][] allCuboidsBitSet = null;
+ private HashFunction hf = null;
+
+ protected int nRowKey;
+ protected long baseCuboidId;
+
+ RowKeyDecoder rowKeyDecoder;
+
+ public SparkCalculateStatsFromBaseCuboidJob() {
+ options = new Options();
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_META_URL);
+ options.addOption(OPTION_JOB_MODE);
+ options.addOption(OPTION_SAMPLING_PERCENT);
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ @Override
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+ String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+ String input = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+ String output = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+ String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+ String jobMode = optionsHelper.getOptionValue(OPTION_JOB_MODE);
+ this.samplingPercent = Integer.parseInt(optionsHelper.getOptionValue(OPTION_SAMPLING_PERCENT));
+
+ SparkConf sparkConf = SparkUtil.setKryoSerializerInConf();
+ sparkConf.setAppName("Kylin_Calculate_Statics_From_BaseCuboid_Data_" + cubeName + "_With_Spark");
+
+ KylinSparkJobListener jobListener = new KylinSparkJobListener();
+ try (JavaSparkContext sc = new JavaSparkContext(sparkConf)) {
+ sc.sc().addSparkListener(jobListener);
+
+ HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(output));
+
+ SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+ KylinConfig config = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+ CubeSegment optSegment;
+ int cubeStatsHLLPrecision;
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+ .setAndUnsetThreadLocalConfig(config)) {
+
+ CubeManager cubeManager = CubeManager.getInstance(config);
+ CubeInstance cube = cubeManager.getCube(cubeName);
+ CubeDesc cubeDesc = cube.getDescriptor();
+ optSegment = cube.getSegmentById(segmentId);
+
+ baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId();
+ nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
+
+ Set<Long> cuboids = cube.getCuboidsByMode(jobMode);
+ if (cuboids.size() == 0) {
+ Set<Long> current = cube.getCuboidsByMode(CuboidModeEnum.CURRENT);
+ current.removeAll(cube.getCuboidsRecommend());
+ cuboids = current;
+ }
+ cuboidIds = cuboids.toArray(new Long[cuboids.size()]);
+ allCuboidsBitSet = CuboidUtil.getCuboidBitSet(cuboidIds, nRowKey);
+ cubeStatsHLLPrecision = config.getCubeStatsHLLPrecision();
+
+
+ allCuboidsHLL = new HLLCounter[cuboidIds.length];
+ for (int i = 0; i < cuboidIds.length; i++) {
+ allCuboidsHLL[i] = new HLLCounter(cubeStatsHLLPrecision);
+ }
+
+ //for KYLIN-2518 backward compatibility
+ if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
+ isUsePutRowKeyToHllNewAlgorithm = false;
+ hf = Hashing.murmur3_32();
+ logger.info("Found KylinVersion : {}. Use old algorithm for cuboid sampling.", cubeDesc.getVersion());
+ } else {
+ isUsePutRowKeyToHllNewAlgorithm = true;
+ rowHashCodesLong = new long[nRowKey];
+ hf = Hashing.murmur3_128();
+ logger.info(
+ "Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518",
+ cubeDesc.getVersion());
+ }
+ }
+
+ JavaPairRDD<Text, Text> inputRDD = sc.sequenceFile(input, Text.class, Text.class);
+
+ JavaPairRDD<Text, Text> afterMapRDD = inputRDD.mapPartitionsToPair(
+ new SparkFunction.PairFlatMapFunctionBase<Iterator<Tuple2<Text, Text>>, Text, Text>() {
+ @Override
+ protected void doInit() {
+ KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+ CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+ CubeSegment cubeSeg = cubeInstance.getSegmentById(segmentId);
+ rowKeyDecoder = new RowKeyDecoder(cubeSeg);
+ }
+
+ @Override
+ protected Iterator<Tuple2<Text, Text>> doCall(Iterator<Tuple2<Text, Text>> iterator)
+ throws Exception {
+ while (iterator.hasNext()) {
+ Text key = iterator.next()._1();
+ long cuboidID = rowKeyDecoder.decode(key.getBytes());
+ if (cuboidID != baseCuboidId) {
+ continue; // Skip data from cuboids which are not the base cuboid
+ }
+
+ List<String> keyValues = rowKeyDecoder.getValues();
+
+ if (rowCount < samplingPercent) {
+ Preconditions.checkArgument(nRowKey == keyValues.size());
+
+ String[] row = keyValues.toArray(new String[keyValues.size()]);
+ if (isUsePutRowKeyToHllNewAlgorithm) {
+ putRowKeyToHLLNew(row);
+ } else {
+ putRowKeyToHLLOld(row);
+ }
+ }
+
+ if (++rowCount == 100)
+ rowCount = 0;
+ }
+
+ ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+ HLLCounter hll;
+ List<Tuple2<Text, Text>> result = new ArrayList<>();
+
+ for (int i = 0; i < cuboidIds.length; i++) {
+ hll = allCuboidsHLL[i];
+ Text outputKey = new Text();
+ Text outputValue = new Text();
+
+ outputKey.set(Bytes.toBytes(cuboidIds[i]));
+ logger.info("Cuboid id to be processed1: " + cuboidIds[i]);
+ hllBuf.clear();
+ hll.writeRegisters(hllBuf);
+ outputValue.set(hllBuf.array(), 0, hllBuf.position());
+ logger.info("Cuboid id to be processed1: " + cuboidIds[i] + "value is "
+ + hllBuf.array().toString());
+ result.add(new Tuple2<Text, Text>(outputKey, outputValue));
+ logger.info("result size: " + result.size());
+ for (Tuple2<Text, Text> t : result) {
+ logger.info("result key: " + t._1().toString());
+ logger.info("result values: " + t._2.toString());
+ }
+ }
+ return result.iterator();
+ }
+
+
+ });
+
+ afterMapRDD.groupByKey().foreach(new SparkFunction.VoidFunctionBase<Tuple2<Text, Iterable<Text>>>() {
+ @Override
+ protected void doInit() {
+ KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+ KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig);
+ }
+
+ @Override
+ protected void doCall(Tuple2<Text, Iterable<Text>> v1) throws Exception {
+ Text key = v1._1();
+ Iterable<Text> values = v1._2();
+
+ long cuboidId = Bytes.toLong(key.getBytes());
+ logger.info("Cuboid id to be processed: " + cuboidId);
+
+ List<Long> baseCuboidRowCountInMappers = Lists.newArrayList();
+ long totalRowsBeforeMerge = 0;
+
+ for (Text value : values) {
+ HLLCounter hll = new HLLCounter(cubeStatsHLLPrecision);
+ ByteBuffer bf = ByteBuffer.wrap(value.getBytes(), 0, value.getLength());
+ hll.readRegisters(bf);
+
+ if (cuboidId == baseCuboidId) {
+ baseCuboidRowCountInMappers.add(hll.getCountEstimate());
+ }
+
+ totalRowsBeforeMerge += hll.getCountEstimate();
+
+ if (cuboidHLLMap.get(cuboidId) != null) {
+ cuboidHLLMap.get(cuboidId).merge(hll);
+ } else {
+ cuboidHLLMap.put(cuboidId, hll);
+ }
+ }
+
+ long grandTotal = 0;
+ for (HLLCounter hll : cuboidHLLMap.values()) {
+ grandTotal += hll.getCountEstimate();
+ }
+ double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
+
+ logger.info("writer cuboIdstatic to " + output);
+ CubeStatsWriter.writePartialCuboidStatistics(sConf.get(), new Path(output), cuboidHLLMap,
+ samplingPercent, baseCuboidRowCountInMappers.size(), mapperOverlapRatio,
+ TaskContext.getPartitionId());
+ }
+ });
+ }
+
+ }
+
+ private void putRowKeyToHLLOld(String[] row) {
+ //generate hash for each row key column
+ byte[][] rowHashCodes = new byte[nRowKey][];
+ for (int i = 0; i < nRowKey; i++) {
+ Hasher hc = hf.newHasher();
+ String colValue = row[i];
+ if (colValue != null) {
+ rowHashCodes[i] = hc.putUnencodedChars(colValue).hash().asBytes();
+ } else {
+ rowHashCodes[i] = hc.putInt(0).hash().asBytes();
+ }
+ }
+
+ // use the row key column hash to get a consolidated hash for each cuboid
+ for (int i = 0; i < cuboidIds.length; i++) {
+ Hasher hc = hf.newHasher();
+ for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+ hc.putBytes(rowHashCodes[allCuboidsBitSet[i][position]]);
+ }
+
+ allCuboidsHLL[i].add(hc.hash().asBytes());
+ }
+ }
+
+ private void putRowKeyToHLLNew(String[] row) {
+ //generate hash for each row key column
+ for (int i = 0; i < nRowKey; i++) {
+ Hasher hc = hf.newHasher();
+ String colValue = row[i];
+ if (colValue == null)
+ colValue = "0";
+ byte[] bytes = hc.putUnencodedChars(colValue).hash().asBytes();
+ rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a)
+ }
+
+ // user the row key column hash to get a consolidated hash for each cuboid
+ for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
+ long value = 0;
+ for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+ value += rowHashCodesLong[allCuboidsBitSet[i][position]];
+ }
+ allCuboidsHLL[i].addHashDirectly(value);
+ }
+ }
+
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 4c9c391..ab0a565 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -96,14 +96,14 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
.isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
- public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true)
- .withDescription("Cube Segment Id").create("segmentId");
- public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
- .withDescription("HDFS metadata url").create("metaUrl");
+ public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
+ .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID);
+ public static final Option OPTION_META_URL = OptionBuilder.withArgName(BatchConstants.ARG_META_URL).hasArg().isRequired(true)
+ .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
.isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true)
- .withDescription("Hive Intermediate Table").create("hiveTable");
+ .withDescription("Hive Intermediate Table").create(BatchConstants.ARG_HIVE_TABLE);
public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
.isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
public static final Option OPTION_SHRUNK_INPUT_PATH = OptionBuilder
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerForOpt.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerForOpt.java
new file mode 100644
index 0000000..8857cc8
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerForOpt.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
+import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
+import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil;
+import org.apache.kylin.engine.mr.common.NDCuboidBuilder;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.storage.StorageLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+public class SparkCubingByLayerForOpt extends AbstractApplication implements Serializable {
+
+ protected static final Logger logger = LoggerFactory.getLogger(SparkCubingByLayerForOpt.class);
+
+ public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+ .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+ public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true)
+ .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID);
+ public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+ .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
+ public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+ .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
+ public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+ .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
+ public static final Option OPTION_CUBOID_MODE = OptionBuilder.withArgName(BatchConstants.ARG_CUBOID_MODE).hasArg()
+ .isRequired(true).withDescription("CoboId Mode ").create(BatchConstants.ARG_CUBOID_MODE);
+
+ private Options options;
+
+ public SparkCubingByLayerForOpt() {
+ options = new Options();
+ options.addOption(OPTION_CUBOID_MODE);
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_META_URL);
+ options.addOption(OPTION_OUTPUT_PATH);
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ @Override
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+ String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+ String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+ String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+ String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+ String cuboidMode = optionsHelper.getOptionValue(OPTION_CUBOID_MODE);
+
+ SparkConf sparkConf = SparkUtil.setKryoSerializerInConf();
+ sparkConf.setAppName("Kylin_Cubing_For_Optimize_" + cubeName + "_With_Spark");
+
+ KylinSparkJobListener jobListener = new KylinSparkJobListener();
+ try (JavaSparkContext sc = new JavaSparkContext(sparkConf)) {
+ SparkUtil.modifySparkHadoopConfiguration(sc.sc(), AbstractHadoopJob
+ .loadKylinConfigFromHdfs(new SerializableConfiguration(sc.hadoopConfiguration()), metaUrl)); // set dfs.replication and enable compress
+ sc.sc().addSparkListener(jobListener);
+
+ final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+ final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+ final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+ final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
+ final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+ final CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, envConfig);
+
+ final Job job = Job.getInstance(sConf.get());
+ SparkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);
+
+ StorageLevel storageLevel = StorageLevel.fromString(envConfig.getSparkStorageLevel());
+ JavaPairRDD<ByteArray, Object[]> baseCuboIdRDD = SparkUtil.getCuboIdRDDFromHdfs(sc, metaUrl, cubeName,
+ cubeSegment, inputPath, cubeDesc.getMeasures().size(), sConf);
+
+ // Don't know statistics so that tree cuboid scheduler is not determined. Determine the maxLevel at runtime
+ final Set<Long> cuboidsByMode = cubeSegment.getCubeInstance().getCuboidsByMode(cuboidMode);
+ final int maxLevel = CuboidUtil.getLongestDepth(cuboidsByMode);
+
+ logger.info("cuboidMode" + cuboidMode);
+ logger.info("maxLevel" + maxLevel);
+
+ CuboidScheduler scheduler = CuboidSchedulerUtil.getCuboidSchedulerByMode(cubeSegment, cuboidMode);
+
+ JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[maxLevel + 1];
+ allRDDs[0] = baseCuboIdRDD;
+
+ SparkCubingByLayer.BaseCuboidReducerFunction2 reducerFunction2 = new SparkCubingByLayer.BaseCuboidReducerFunction2(
+ cubeName, metaUrl, sConf);
+
+ boolean allNormalMeasure = true;
+ boolean[] needAggr = new boolean[cubeDesc.getMeasures().size()];
+ for (int i = 0; i < cubeDesc.getMeasures().size(); i++) {
+ needAggr[i] = !cubeDesc.getMeasures().get(i).getFunction().getMeasureType().onlyAggrInBaseCuboid();
+ allNormalMeasure = allNormalMeasure && needAggr[i];
+ }
+
+ if (!allNormalMeasure) {
+ reducerFunction2 = new SparkCubingByLayer.CuboidReducerFunction2(cubeName, metaUrl, sConf, needAggr);
+ }
+
+ for (int i = 1; i <= maxLevel; i++) {
+ int partition = SparkUtil.estimateLayerPartitionNum(i, cubeStatsReader, envConfig);
+ allRDDs[i] = allRDDs[i - 1]
+ .flatMapToPair(new CuboidFlatMap(cubeName, segmentId, metaUrl, sConf, scheduler))
+ .reduceByKey(reducerFunction2, partition);
+ allRDDs[i].persist(storageLevel);
+ saveToHDFS(allRDDs[i], metaUrl, cubeName, cubeSegment, outputPath, i, job);
+ // must do unpersist after allRDDs[level] is created, otherwise this RDD will be recomputed
+ allRDDs[i - 1].unpersist(false);
+ }
+ allRDDs[maxLevel].unpersist(false);
+ logger.info("Finished on calculating needed cuboids For Optimize.");
+ logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
+
+ }
+
+ }
+
+ static public class CuboidFlatMap
+ extends SparkFunction.PairFlatMapFunctionBase<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> {
+
+ private String cubeName;
+ private String segmentId;
+ private String metaUrl;
+ private CubeSegment cubeSegment;
+ private CubeDesc cubeDesc;
+ private NDCuboidBuilder ndCuboidBuilder;
+ private RowKeySplitter rowKeySplitter;
+ private SerializableConfiguration conf;
+ private CuboidScheduler cuboidScheduler;
+
+ public CuboidFlatMap(String cubeName, String segmentId, String metaUrl, SerializableConfiguration conf,
+ CuboidScheduler scheduler) {
+ this.cubeName = cubeName;
+ this.segmentId = segmentId;
+ this.metaUrl = metaUrl;
+ this.conf = conf;
+ this.cuboidScheduler = scheduler;
+ }
+
+ @Override
+ protected void doInit() {
+ KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoClose = KylinConfig.setAndUnsetThreadLocalConfig(kConfig)) {
+ CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+ this.cubeSegment = cubeInstance.getSegmentById(segmentId);
+ this.cubeDesc = cubeInstance.getDescriptor();
+ this.rowKeySplitter = new RowKeySplitter(cubeSegment);
+ this.ndCuboidBuilder = new NDCuboidBuilder(cubeSegment, new RowKeyEncoderProvider(cubeSegment));
+ }
+ }
+
+ @Override
+ public Iterator<Tuple2<ByteArray, Object[]>> doCall(final Tuple2<ByteArray, Object[]> tuple2) throws Exception {
+ byte[] key = tuple2._1().array();
+ long cuboidId = rowKeySplitter.parseCuboid(key);
+ final List<Long> myChildren = cuboidScheduler.getSpanningCuboid(cuboidId);
+
+ // if still empty or null
+ if (myChildren == null || myChildren.size() == 0) {
+ return EMTPY_ITERATOR.iterator();
+ }
+ rowKeySplitter.split(key);
+ final Cuboid parentCuboid = Cuboid.findForMandatory(cubeDesc, cuboidId);
+
+ List<Tuple2<ByteArray, Object[]>> tuples = new ArrayList(myChildren.size());
+ for (Long child : myChildren) {
+ Cuboid childCuboid = Cuboid.findForMandatory(cubeDesc, child);
+ ByteArray result = ndCuboidBuilder.buildKey2(parentCuboid, childCuboid,
+ rowKeySplitter.getSplitBuffers());
+
+ tuples.add(new Tuple2<>(result, tuple2._2()));
+ }
+
+ return tuples.iterator();
+ }
+ }
+
+ private static final java.lang.Iterable<Tuple2<ByteArray, Object[]>> EMTPY_ITERATOR = new ArrayList(0);
+
+ protected void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final String metaUrl, final String cubeName,
+ final CubeSegment cubeSeg, final String hdfsBaseLocation, final int level, final Job job) throws Exception {
+ final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
+ final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
+
+ IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getOutputFormat();
+ outputFormat.configureJobOutput(job, cuboidOutputPath, cubeSeg, cubeSeg.getCuboidScheduler(), level);
+
+ rdd.mapToPair(new SparkFunction.PairFunctionBase<Tuple2<ByteArray, Object[]>, Text, Text>() {
+ private BufferedMeasureCodec codec;
+
+ @Override
+ protected void doInit() {
+ KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoClose = KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig)) {
+ CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
+ codec = new BufferedMeasureCodec(desc.getMeasures());
+ }
+ }
+
+ @Override
+ public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> doCall(
+ Tuple2<ByteArray, Object[]> tuple2) throws Exception {
+ ByteBuffer valueBuf = codec.encode(tuple2._2());
+ org.apache.hadoop.io.Text textResult = new org.apache.hadoop.io.Text();
+ textResult.set(valueBuf.array(), 0, valueBuf.position());
+ return new Tuple2<>(new org.apache.hadoop.io.Text(tuple2._1().array()), textResult);
+ }
+ }).saveAsNewAPIHadoopDataset(job.getConfiguration());
+ logger.info("Persisting RDD for level " + level + " into " + cuboidOutputPath);
+ }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
index 94f3a4e..71cd5aa 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
@@ -68,9 +68,9 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl
public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
.isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true)
- .withDescription("Cube Segment Id").create("segmentId");
+ .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID);
public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
- .withDescription("HDFS metadata url").create("metaUrl");
+ .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
.isRequired(true).withDescription("HFile output path").create(BatchConstants.ARG_OUTPUT);
public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 66ae57e..fca1839 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -210,7 +210,7 @@ public class SparkExecutable extends AbstractExecutable {
if (!StringUtils.isEmpty(sparkJobId)) {
return onResumed(sparkJobId, mgr);
} else {
- String cubeName = this.getParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt());
+ String cubeName = this.getParam(BatchConstants.ARG_CUBE_NAME);
CubeInstance cube;
if (cubeName != null) {
cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName);
@@ -222,7 +222,7 @@ public class SparkExecutable extends AbstractExecutable {
config = cube.getConfig();
} else {
// when loading hive table, we can't get cube name/config, so we get config from project.
- String projectName = this.getParam(SparkColumnCardinality.OPTION_PRJ.getOpt());
+ String projectName = this.getParam(BatchConstants.ARG_PROJECT);
ProjectInstance projectInst = ProjectManager.getInstance(context.getConfig()).getProject(projectName);
config = projectInst.getConfig();
}
@@ -253,7 +253,7 @@ public class SparkExecutable extends AbstractExecutable {
if (cube != null && !isCreateFlatTable()) {
setAlgorithmLayer();
- String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
+ String segmentID = this.getParam(BatchConstants.ARG_SEGMENT_ID);
CubeSegment segment = cube.getSegmentById(segmentID);
Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment);
dumpMetadata(segment, mergingSeg);
@@ -522,7 +522,7 @@ public class SparkExecutable extends AbstractExecutable {
CubeDescTiretreeGlobalDomainDictUtil.cuboidJob(segment.getCubeDesc(), dumpList);
JobRelatedMetaUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segment.getConfig(),
- this.getParam(SparkCubingByLayer.OPTION_META_URL.getOpt()));
+ this.getParam(BatchConstants.ARG_META_URL));
}
private void attachSegmentsMetadataWithDict(List<CubeSegment> segments) throws IOException {
@@ -539,7 +539,7 @@ public class SparkExecutable extends AbstractExecutable {
CubeDescTiretreeGlobalDomainDictUtil.cuboidJob(segment.getCubeDesc(), dumpList);
}
JobRelatedMetaUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segments.get(0).getConfig(),
- this.getParam(SparkCubingByLayer.OPTION_META_URL.getOpt()));
+ this.getParam(BatchConstants.ARG_META_URL));
}
protected void readCounters(final Map<String, String> info) {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
index a116cc8..d2020f1 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
@@ -110,11 +110,11 @@ public class SparkFactDistinct extends AbstractApplication implements Serializab
public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
.isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
- .withDescription("HDFS metadata url").create("metaUrl");
+ .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
.isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
- .withDescription("Cube Segment Id").create("segmentId");
+ .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID);
public static final Option OPTION_STATS_SAMPLING_PERCENT = OptionBuilder
.withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(true)
.withDescription("Statistics sampling percent").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT);
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFilterRecommendCuboidDataJob.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFilterRecommendCuboidDataJob.java
new file mode 100644
index 0000000..704dc8d
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFilterRecommendCuboidDataJob.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.MapReduceUtil;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.shaded.com.google.common.base.Preconditions;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import java.util.Set;
+
+import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidBase;
+import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidOld;
+import static org.apache.kylin.engine.spark.SparkUtil.configConvergeCuboidDataReduceOut;
+import static org.apache.kylin.engine.spark.SparkUtil.generateFilePath;
+
+public class SparkFilterRecommendCuboidDataJob extends AbstractApplication implements Serializable {
+
+ protected static final Logger logger = LoggerFactory.getLogger(SparkFilterRecommendCuboidDataJob.class);
+
+ public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+ .isRequired(true).create(BatchConstants.ARG_CUBE_NAME);
+ public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID).hasArg().isRequired(true)
+ .create(BatchConstants.ARG_SEGMENT_ID);
+ public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+ .isRequired(true).create(BatchConstants.ARG_INPUT);
+ public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+ .isRequired(true).create(BatchConstants.ARG_OUTPUT);
+ public static final Option OPTION_META_URL = OptionBuilder.withArgName(BatchConstants.ARG_META_URL).hasArg().isRequired(true)
+ .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
+
+ private Options options;
+
+ public SparkFilterRecommendCuboidDataJob() {
+ options = new Options();
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_META_URL);
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ @Override
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+ String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+ String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+ String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+ String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+
+ boolean enableSharding;
+ long baseCuboid;
+ Set<Long> recommendCuboids;
+
+ SparkConf sparkConf = SparkUtil.setKryoSerializerInConf();
+ sparkConf.setAppName("Kylin_Filter_Recommend_Cuboid_Data_" + cubeName + "_With_Spark");
+
+ KylinSparkJobListener jobListener = new KylinSparkJobListener();
+ try (JavaSparkContext sc = new JavaSparkContext(sparkConf)) {
+ sc.sc().addSparkListener(jobListener);
+
+ final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+ KylinConfig config = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+ HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), new Path(outputPath));
+
+ CubeManager cubeManager = CubeManager.getInstance(config);
+ CubeInstance cube = cubeManager.getCube(cubeName);
+ CubeSegment optSegment = cube.getSegmentById(segmentId);
+ CubeSegment originalSegment = cube.getOriginalSegmentToOptimize(optSegment);
+
+ enableSharding = originalSegment.isEnableSharding();
+ baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
+
+ recommendCuboids = cube.getCuboidsRecommend();
+ Preconditions.checkNotNull(recommendCuboids, "The recommend cuboid map could not be null");
+
+ FileSystem hdfs = FileSystem.get(sc.hadoopConfiguration());
+ if (!hdfs.exists(new Path(inputPath.substring(0, inputPath.length() - 1)))) {
+ throw new IOException("OldCuboIdFilePath " + inputPath + " does not exists");
+ }
+
+ // inputPath is oldcuboidRootPath
+ JavaPairRDD<Text, Text> inputRDD = sc.sequenceFile(inputPath, Text.class, Text.class);
+
+ logger.info("start to calculate nBaseReduceTasks");
+ Pair<Integer, Integer> taskNums = MapReduceUtil.getConvergeCuboidDataReduceTaskNums(originalSegment);
+ int reduceTasks = taskNums.getFirst();
+ int nBaseReduceTasks = taskNums.getSecond();
+ logger.info("nBaseReduceTasks is {}", nBaseReduceTasks);
+
+ final Job job = Job.getInstance(sConf.get());
+ SparkUtil.setHadoopConfForCuboid(job, originalSegment, metaUrl);
+
+ JavaPairRDD<Text, Text> baseCuboIdRDD = inputRDD.filter(new Function<Tuple2<Text, Text>, Boolean>() {
+ @Override
+ public Boolean call(Tuple2<Text, Text> v1) throws Exception {
+ long cuboidId = RowKeySplitter.getCuboidId(v1._1.getBytes(), enableSharding);
+ return cuboidId == baseCuboid;
+ }
+ });
+
+ configConvergeCuboidDataReduceOut(job, generateFilePath(PathNameCuboidBase, outputPath));
+ baseCuboIdRDD.coalesce(nBaseReduceTasks).saveAsNewAPIHadoopDataset(job.getConfiguration());
+
+ JavaPairRDD<Text, Text> reuseCuboIdRDD = inputRDD.filter(new Function<Tuple2<Text, Text>, Boolean>() {
+ @Override
+ public Boolean call(Tuple2<Text, Text> v1) throws Exception {
+ long cuboidId = RowKeySplitter.getCuboidId(v1._1.getBytes(), enableSharding);
+ return recommendCuboids.contains(cuboidId);
+ }
+ });
+
+ configConvergeCuboidDataReduceOut(job, generateFilePath(PathNameCuboidOld, outputPath));
+ reuseCuboIdRDD.coalesce(reduceTasks).saveAsNewAPIHadoopDataset(job.getConfiguration());
+
+ }
+ }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFunction.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFunction.java
new file mode 100644
index 0000000..39917a3
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFunction.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+public class SparkFunction {
+
+ protected static final Logger logger = LoggerFactory.getLogger(SparkFunction.class);
+
+ private static abstract class FunctionBase implements Serializable {
+ private volatile transient boolean initialized = false;
+ private transient int recordCounter;
+
+ protected abstract void doInit();
+
+ protected void init() {
+ if (!initialized) {
+ synchronized (SparkFunction.class) {
+ if (!initialized) {
+ logger.info("Start to do init for {}", this);
+ doInit();
+ initialized = true;
+ recordCounter = 0;
+ }
+ }
+ }
+ if (recordCounter++ % SparkUtil.getNormalRecordLogThreshold() == 0) {
+ logger.info("Accepting record with ordinal: " + recordCounter);
+ logger.info("Do call, available memory: {}m", MemoryBudgetController.getSystemAvailMB());
+ }
+ }
+ }
+
+ public static abstract class PairFunctionBase<T, K, V> extends FunctionBase implements PairFunction<T, K, V> {
+
+ protected abstract Tuple2<K, V> doCall(T t) throws Exception;
+
+ @Override
+ public Tuple2<K, V> call(T t) throws Exception {
+ init();
+ return doCall(t);
+ }
+ }
+
+ public static abstract class Function2Base<T1, T2, R> extends FunctionBase implements Function2<T1, T2, R> {
+
+ protected abstract R doCall(T1 v1, T2 v2) throws Exception;
+
+ @Override
+ public R call(T1 v1, T2 v2) throws Exception {
+ init();
+ return doCall(v1, v2);
+ }
+ }
+
+ public static abstract class PairFlatMapFunctionBase<T, K, V> extends FunctionBase implements PairFlatMapFunction<T, K, V> {
+
+ protected abstract Iterator<Tuple2<K, V>> doCall(T t) throws Exception;
+
+ @Override
+ public Iterator<Tuple2<K, V>> call(T t) throws Exception {
+ init();
+ return doCall(t);
+ }
+ }
+
+ public static abstract class VoidFunctionBase<T> extends FunctionBase implements VoidFunction<T> {
+
+ protected abstract void doCall(T t) throws Exception;
+
+ @Override
+ public void call(T t) throws Exception {
+ init();
+ doCall(t);
+ }
+ }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
index ddd0755..4a4bbb0 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
@@ -82,9 +82,9 @@ public class SparkMergingDictionary extends AbstractApplication implements Seria
public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
.isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
- .withDescription("Cube Segment Id").create("segmentId");
+ .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID);
public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
- .withDescription("HDFS metadata url").create("metaUrl");
+ .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
public static final Option OPTION_MERGE_SEGMENT_IDS = OptionBuilder.withArgName("segmentIds").hasArg()
.isRequired(true).withDescription("Merging Cube Segment Ids").create("segmentIds");
public static final Option OPTION_OUTPUT_PATH_DICT = OptionBuilder.withArgName("dictOutputPath").hasArg()
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUHCDictionary.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUHCDictionary.java
index 0662abb..f1c4419 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUHCDictionary.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUHCDictionary.java
@@ -76,11 +76,11 @@ public class SparkUHCDictionary extends AbstractApplication implements Serializa
public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
.isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
- .withDescription("HDFS metadata url").create("metaUrl");
+ .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
.isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
- .withDescription("Cube Segment Id").create("segmentId");
+ .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID);
public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
.isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
public static final Option OPTION_CUBING_JOB_ID = OptionBuilder
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUpdateShardForOldCuboidDataStep.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUpdateShardForOldCuboidDataStep.java
new file mode 100644
index 0000000..f153459
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUpdateShardForOldCuboidDataStep.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.kv.RowKeyEncoder;
+import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.MapReduceUtil;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.io.Serializable;
+
+import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidBase;
+import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidOld;
+import static org.apache.kylin.engine.spark.SparkUtil.configConvergeCuboidDataReduceOut;
+import static org.apache.kylin.engine.spark.SparkUtil.generateFilePath;
+
+public class SparkUpdateShardForOldCuboidDataStep extends AbstractApplication implements Serializable {
+
+ protected static final Logger logger = LoggerFactory.getLogger(SparkUpdateShardForOldCuboidDataStep.class);
+
+ public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+ .isRequired(true).create(BatchConstants.ARG_CUBE_NAME);
+ public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID).hasArg().isRequired(true)
+ .create(BatchConstants.ARG_SEGMENT_ID);
+ public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+ .isRequired(true).create(BatchConstants.ARG_INPUT);
+ public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+ .isRequired(true).create(BatchConstants.ARG_OUTPUT);
+ public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+ .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
+
+ private Options options;
+ private CubeDesc cubeDesc;
+ private RowKeySplitter rowKeySplitter;
+ private RowKeyEncoderProvider rowKeyEncoderProvider;
+
+ private byte[] newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];
+ private ByteArray newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
+
+ public SparkUpdateShardForOldCuboidDataStep() {
+ options = new Options();
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_META_URL);
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ @Override
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ // input is optimizeCuboidRootPath + "*", output is cuboidRootPath.
+ String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+ String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+ String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+ String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+ String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+
+ String baseCuboIdInputPath = inputPath + PathNameCuboidBase;
+ String oldCuboIdInputPath = inputPath + PathNameCuboidOld;
+
+ SparkConf sparkConf = SparkUtil.setKryoSerializerInConf();
+ sparkConf.setAppName("Update_Old_Cuboid_Shard_for_Optimization" + cubeName + "_With_Spark");
+
+ KylinSparkJobListener jobListener = new KylinSparkJobListener();
+ try (JavaSparkContext sc = new JavaSparkContext(sparkConf)) {
+ sc.sc().addSparkListener(jobListener);
+
+ final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+ KylinConfig config = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+ HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), new Path(outputPath));
+
+ CubeManager cubeManager = CubeManager.getInstance(config);
+ CubeInstance cube = cubeManager.getCube(cubeName);
+ CubeSegment optSegment = cube.getSegmentById(segmentId);
+ CubeSegment originalSegment = cube.getOriginalSegmentToOptimize(optSegment);
+ //
+ optSegment.setCubeInstance(originalSegment.getCubeInstance());
+
+ JavaPairRDD<Text, Text> baseCuboIdRDD = sc.sequenceFile(baseCuboIdInputPath, Text.class, Text.class);
+ JavaPairRDD<Text, Text> oldCuboIdRDD = sc.sequenceFile(oldCuboIdInputPath, Text.class, Text.class);
+
+ cubeDesc = cube.getDescriptor();
+
+ logger.info("start to calculate nBaseReduceTasks");
+ Pair<Integer, Integer> taskNums = MapReduceUtil.getConvergeCuboidDataReduceTaskNums(originalSegment);
+ int reduceTasks = taskNums.getFirst();
+ int nBaseReduceTasks = taskNums.getSecond();
+ logger.info("nBaseReduceTasks is {}", nBaseReduceTasks);
+
+ final Job job = Job.getInstance(sConf.get());
+ SparkUtil.setHadoopConfForCuboid(job, originalSegment, metaUrl);
+
+ //UpdateCuboidShard for baseCuboId
+ JavaPairRDD<Text, Text> mapBaseCuboIdRDD = baseCuboIdRDD.mapToPair(new SparkFunction.PairFunctionBase<Tuple2<Text, Text>, Text, Text>() {
+ @Override
+ protected void doInit() {
+ initMethod(sConf, metaUrl, cubeName, optSegment, originalSegment);
+ }
+
+ @Override
+ protected Tuple2<Text, Text> doCall(Tuple2<Text, Text> tuple2) throws Exception {
+ Text outputKey = new Text();
+ long cuboidID = rowKeySplitter.split(tuple2._1.getBytes());
+
+ Cuboid cuboid = new Cuboid(cubeDesc, cuboidID, cuboidID);
+ int fullKeySize = buildKey(cuboid, rowKeySplitter.getSplitBuffers());
+ outputKey.set(newKeyBuf.array(), 0, fullKeySize);
+ return new Tuple2<Text, Text>(outputKey, tuple2._2);
+ }
+ });
+
+ configConvergeCuboidDataReduceOut(job, generateFilePath(PathNameCuboidBase, outputPath));
+ mapBaseCuboIdRDD.repartition(nBaseReduceTasks).saveAsNewAPIHadoopDataset(job.getConfiguration());
+
+ //UpdateCuboidShard for oldCuboIds
+ JavaPairRDD<Text, Text> mapOldCuboIdRDD = oldCuboIdRDD.mapToPair(new SparkFunction.PairFunctionBase<Tuple2<Text, Text>, Text, Text>() {
+ @Override
+ protected void doInit() {
+ initMethod(sConf, metaUrl, cubeName, optSegment, originalSegment);
+ }
+
+ @Override
+ protected Tuple2<Text, Text> doCall(Tuple2<Text, Text> tuple2) throws Exception {
+ Text outputKey = new Text();
+ long cuboidID = rowKeySplitter.split(tuple2._1.getBytes());
+
+ Cuboid cuboid = new Cuboid(cubeDesc, cuboidID, cuboidID);
+ int fullKeySize = buildKey(cuboid, rowKeySplitter.getSplitBuffers());
+ outputKey.set(newKeyBuf.array(), 0, fullKeySize);
+ return new Tuple2<Text, Text>(outputKey, tuple2._2);
+ }
+ });
+
+ configConvergeCuboidDataReduceOut(job, generateFilePath(PathNameCuboidOld, outputPath));
+ mapOldCuboIdRDD.repartition(reduceTasks).saveAsNewAPIHadoopDataset(job.getConfiguration());
+
+ //SparkUtil.convergeCuboidDataReduce(mapRDD, cube, originalSegment, outputPath, metaUrl, config, sConf);
+
+ }
+ }
+
+ private int buildKey(Cuboid cuboid, ByteArray[] splitBuffers) {
+ RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
+
+ int startIdx = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboidId
+ int endIdx = startIdx + Long.bitCount(cuboid.getId());
+ int offset = 0;
+ for (int i = startIdx; i < endIdx; i++) {
+ System.arraycopy(splitBuffers[i].array(), splitBuffers[i].offset(), newKeyBodyBuf, offset,
+ splitBuffers[i].length());
+ offset += splitBuffers[i].length();
+ }
+
+ int fullKeySize = rowkeyEncoder.getBytesLength();
+ while (newKeyBuf.array().length < fullKeySize) {
+ newKeyBuf = new ByteArray(newKeyBuf.length() * 2);
+ }
+ newKeyBuf.setLength(fullKeySize);
+
+ rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, offset), newKeyBuf);
+
+ return fullKeySize;
+ }
+
+ private void initMethod(SerializableConfiguration sConf, String metaUrl, String cubeName, CubeSegment optSegment, CubeSegment originalSegment) {
+ KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+ KylinConfig.setKylinConfigInEnvIfMissing(kylinConfig.exportToProperties());
+ CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+ CubeSegment originalSeg = cubeInstance.getSegmentById(originalSegment.getUuid());
+ CubeSegment optSeg = cubeInstance.getSegmentById(optSegment.getUuid());
+ rowKeySplitter = new RowKeySplitter(originalSeg);
+ rowKeyEncoderProvider = new RowKeyEncoderProvider(optSeg);
+ }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index d146c85..66ff734 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -20,7 +20,9 @@ package org.apache.kylin.engine.spark;
import java.io.DataInputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
@@ -30,22 +32,31 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dict.ShrunkenDictionary;
import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.SourceManager;
import org.apache.kylin.storage.StorageFactory;
+import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
@@ -61,11 +72,16 @@ import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.HiveUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.Tuple2;
public class SparkUtil {
private static final Logger logger = LoggerFactory.getLogger(SparkUtil.class);
+ public static int getNormalRecordLogThreshold() {
+ return 1000;
+ }
+
public static ISparkBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg);
return (ISparkBatchCubingInputSide)SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchCubingInputSide(flatDesc);
@@ -83,8 +99,8 @@ public class SparkUtil {
return (ISparkBatchMergeInputSide)SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchMergeInputSide(seg);
}
- public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) {
- return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchOptimizeOutputSide(seg);
+ public static ISparkOutput.ISparkBatchOptimizeOutputSide getBatchOptimizeOutputSide2(CubeSegment seg) {
+ return StorageFactory.createEngineAdapter(seg, ISparkOutput.class).getBatchOptimizeOutputSide(seg);
}
/**
@@ -228,4 +244,81 @@ public class SparkUtil {
return dictionaryMap;
}
+
+ public static SparkConf setKryoSerializerInConf() throws ClassNotFoundException {
+ Class[] kryoClassArray = new Class<?>[] { Class.forName("scala.reflect.ClassTag$$anon$1"),
+ Class.forName("org.apache.kylin.engine.mr.steps.SelfDefineSortableKey"),
+ Class.forName("scala.collection.mutable.WrappedArray$ofRef"),
+ Class.forName("org.apache.hadoop.io.Text") };
+
+ SparkConf sparkConf = new SparkConf();
+
+ //serialization conf
+ sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ sparkConf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
+ sparkConf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
+
+ return sparkConf;
+ }
+
+ public static String generateFilePath(String subOutputPath, String jobOutPath) {
+ logger.info(
+ "ConvergeCuboidDataReduce Out Path" + String.format(Locale.ROOT, "%s%s", jobOutPath, subOutputPath));
+ return String.format(Locale.ROOT, "%s%s", jobOutPath, subOutputPath);
+ }
+
+ public static JavaPairRDD<ByteArray, Object[]> getCuboIdRDDFromHdfs(JavaSparkContext sc, final String metaUrl,
+ final String cubeName, final CubeSegment seg, final String inputPath, final int measureNum,
+ SerializableConfiguration sConf) {
+
+ JavaPairRDD<Text, Text> rdd = sc.sequenceFile(inputPath, Text.class, Text.class);
+ // re-encode
+ return rdd.mapToPair(new ReEncodeCuboidFunction(cubeName, seg.getUuid(), metaUrl, sConf, measureNum));
+ }
+
+ static class ReEncodeCuboidFunction
+ extends SparkFunction.PairFunctionBase<Tuple2<Text, Text>, ByteArray, Object[]> {
+ private String cubeName;
+ private String sourceSegmentId;
+ private String metaUrl;
+ private SerializableConfiguration conf;
+ private int measureNum;
+ private transient KylinConfig kylinConfig;
+ private transient BufferedMeasureCodec segmentReEncoder = null;
+
+ ReEncodeCuboidFunction(String cubeName, String sourceSegmentId, String metaUrl, SerializableConfiguration conf,
+ int measureNum) {
+ this.cubeName = cubeName;
+ this.sourceSegmentId = sourceSegmentId;
+ this.metaUrl = metaUrl;
+ this.conf = conf;
+ this.measureNum = measureNum;
+ }
+
+ @Override
+ protected void doInit() {
+ this.kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+ .setAndUnsetThreadLocalConfig(kylinConfig)) {
+ final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+ final CubeDesc cubeDesc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cube.getDescName());
+ final CubeSegment sourceSeg = cube.getSegmentById(sourceSegmentId);
+ this.segmentReEncoder = new BufferedMeasureCodec(cubeDesc.getMeasures());
+ }
+ }
+
+ @Override
+ public Tuple2<ByteArray, Object[]> doCall(Tuple2<Text, Text> textTextTuple2) throws Exception {
+ Object[] result = new Object[measureNum];
+ segmentReEncoder.decode(ByteBuffer.wrap(textTextTuple2._2().getBytes(), 0, textTextTuple2._2().getLength()),
+ result);
+ return new Tuple2<ByteArray, Object[]>(new ByteArray(textTextTuple2._1().getBytes()), result);
+ }
+ }
+
+ public static void configConvergeCuboidDataReduceOut(Job job, String output) throws IOException {
+ Path outputPath = new Path(output);
+ FileOutputFormat.setOutputPath(job, outputPath);
+ HadoopUtil.deletePath(job.getConfiguration(), outputPath);
+ }
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
index 03ae3f2..1ea5f2c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
@@ -98,7 +98,7 @@ public class BulkLoadJob extends AbstractHadoopJob {
int ret = 0;
if (count > 0) {
- logger.debug("Start to run LoadIncrementalHFiles");
+ logger.debug("Start to run LoadIncrementalHFiles, File count is: " + count);
ret = MRUtil.runMRJob(new LoadIncrementalHFiles(conf), newArgs);
logger.debug("End to run LoadIncrementalHFiles");
return ret;
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index cfeec57..e9af5e0 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -102,6 +102,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
// for cube planner, will keep cuboidSizeMap unchanged if cube planner is disabled
Set<Long> buildingCuboids = cube.getCuboidsByMode(cuboidModeName);
if (buildingCuboids != null && !buildingCuboids.isEmpty()) {
+ logger.info("CreateHTableJob buildingCuboids size: " + buildingCuboids.size());
Map<Long, Double> optimizedCuboidSizeMap = Maps.newHashMapWithExpectedSize(buildingCuboids.size());
for (Long cuboid : buildingCuboids) {
Double cuboidSize = cuboidSizeMap.get(cuboid);
@@ -109,6 +110,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
logger.warn("{} cuboid's size is null will replace by 0", cuboid);
cuboidSize = 0.0;
}
+ logger.info("Cuboid:" + cuboid + " size: " + cuboidSize);
optimizedCuboidSizeMap.put(cuboid, cuboidSize);
}
cuboidSizeMap = optimizedCuboidSizeMap;
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
index 3702a92..b0b47dd 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
@@ -21,6 +21,7 @@ package org.apache.kylin.storage.hbase.steps;
import java.util.List;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
import org.apache.kylin.engine.spark.ISparkOutput;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.slf4j.Logger;
@@ -98,6 +99,29 @@ public class HBaseSparkOutputTransition implements ISparkOutput {
}
public ISparkBatchOptimizeOutputSide getBatchOptimizeOutputSide(final CubeSegment seg) {
- return null;
+ return new ISparkBatchOptimizeOutputSide() {
+ HBaseSparkSteps steps = new HBaseSparkSteps(seg);
+
+ @Override
+ public void addStepPhase2_CreateHTable(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(steps.createCreateHTableStep(jobFlow.getId(), CuboidModeEnum.RECOMMEND));
+ }
+
+ @Override
+ public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
+ jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
+ }
+
+ @Override
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+ steps.addOptimizeGarbageCollectionSteps(jobFlow);
+ }
+
+ @Override
+ public void addStepPhase5_Cleanup(DefaultChainedExecutable jobFlow) {
+ steps.addCheckpointGarbageCollectionSteps(jobFlow);
+ }
+ };
}
}
\ No newline at end of file
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index d458b8c..807d23f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -82,9 +82,9 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
.isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true)
- .withDescription("Cube Segment Id").create("segmentId");
+ .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID);
public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
- .withDescription("HDFS metadata url").create("metaUrl");
+ .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
.isRequired(true).withDescription("HFile output path").create(BatchConstants.ARG_OUTPUT);
public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()