You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/03/11 08:00:59 UTC
[1/2] kylin git commit: clean code
Repository: kylin
Updated Branches:
refs/heads/master 4662adab7 -> 5e13bba08
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 0a6c123..0b45795 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -22,12 +22,8 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import java.util.Map;
-import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -38,7 +34,6 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
@@ -52,16 +47,12 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
public class MergeStatisticsStep extends AbstractExecutable {
- private static final String CUBE_NAME = "cubeName";
- private static final String SEGMENT_ID = "segmentId";
- private static final String MERGING_SEGMENT_IS = "mergingSegmentIds";
- private static final String MERGED_STATISTICS_PATH = "mergedStatisticsPath";
protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = Maps.newHashMap();
public MergeStatisticsStep() {
@@ -73,16 +64,16 @@ public class MergeStatisticsStep extends AbstractExecutable {
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
KylinConfig kylinConf = context.getConfig();
final CubeManager mgr = CubeManager.getInstance(kylinConf);
- final CubeInstance cube = mgr.getCube(getCubeName());
- final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
+ final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+ final CubeSegment newSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
Configuration conf = HadoopUtil.getCurrentConfiguration();
ResourceStore rs = ResourceStore.getStore(kylinConf);
try {
int averageSamplingPercentage = 0;
- for (String segmentId : this.getMergingSegmentIds()) {
- String fileKey = CubeSegment.getStatisticsResourcePath(getCubeName(), segmentId);
+ for (String segmentId : CubingExecutableUtil.getMergingSegmentIds(this.getParams())) {
+ String fileKey = CubeSegment.getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId);
InputStream is = rs.getResource(fileKey).inputStream;
File tempFile = null;
FileOutputStream tempFileStream = null;
@@ -126,9 +117,9 @@ public class MergeStatisticsStep extends AbstractExecutable {
tempFile.delete();
}
}
- averageSamplingPercentage = averageSamplingPercentage / this.getMergingSegmentIds().size();
- CuboidStatsUtil.writeCuboidStatistics(conf, new Path(getMergedStatisticsPath()), cuboidHLLMap, averageSamplingPercentage);
- Path statisticsFilePath = new Path(getMergedStatisticsPath(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
+ averageSamplingPercentage = averageSamplingPercentage / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
+ CuboidStatsUtil.writeCuboidStatistics(conf, new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap, averageSamplingPercentage);
+ Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
FileSystem fs = statisticsFilePath.getFileSystem(conf);
FSDataInputStream is = fs.open(statisticsFilePath);
try {
@@ -146,45 +137,4 @@ public class MergeStatisticsStep extends AbstractExecutable {
}
}
- public void setCubeName(String cubeName) {
- this.setParam(CUBE_NAME, cubeName);
- }
-
- private String getCubeName() {
- return getParam(CUBE_NAME);
- }
-
- public void setSegmentId(String segmentId) {
- this.setParam(SEGMENT_ID, segmentId);
- }
-
- private String getSegmentId() {
- return getParam(SEGMENT_ID);
- }
-
- public void setMergingSegmentIds(List<String> ids) {
- setParam(MERGING_SEGMENT_IS, StringUtils.join(ids, ","));
- }
-
- private List<String> getMergingSegmentIds() {
- final String ids = getParam(MERGING_SEGMENT_IS);
- if (ids != null) {
- final String[] splitted = StringUtils.split(ids, ",");
- ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
- for (String id : splitted) {
- result.add(id);
- }
- return result;
- } else {
- return Collections.emptyList();
- }
- }
-
- public void setMergedStatisticsPath(String path) {
- setParam(MERGED_STATISTICS_PATH, path);
- }
-
- private String getMergedStatisticsPath() {
- return getParam(MERGED_STATISTICS_PATH);
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index 1dbce8e..ff9be44 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -131,7 +131,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
if (myChildren == null || myChildren.size() == 0) {
context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Skipped records").increment(1L);
skipCounter++;
- if (skipCounter % BatchConstants.COUNTER_MAX == 0) {
+ if (skipCounter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
logger.info("Skipped " + skipCounter + " records!");
}
return;
@@ -140,7 +140,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Processed records").increment(1L);
handleCounter++;
- if (handleCounter % BatchConstants.COUNTER_MAX == 0) {
+ if (handleCounter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
logger.info("Handled " + handleCounter + " records!");
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index 9314b88..288ca6a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -47,11 +47,6 @@ import org.apache.kylin.metadata.model.MeasureDesc;
*/
public class SaveStatisticsStep extends AbstractExecutable {
- private static final String CUBE_NAME = "cubeName";
- private static final String SEGMENT_ID = "segmentId";
- private static final String STATISTICS_PATH = "statisticsPath";
- private static final String CUBING_JOB_ID = "cubingJobId";
-
public SaveStatisticsStep() {
super();
}
@@ -60,15 +55,15 @@ public class SaveStatisticsStep extends AbstractExecutable {
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
KylinConfig kylinConf = context.getConfig();
final CubeManager mgr = CubeManager.getInstance(kylinConf);
- final CubeInstance cube = mgr.getCube(getCubeName());
- final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
+ final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+ final CubeSegment newSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
ResourceStore rs = ResourceStore.getStore(kylinConf);
try {
- Path statisticsFilePath = new Path(getStatisticsPath(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
+ Path statisticsFilePath = new Path(CubingExecutableUtil.getStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
FileSystem fs = FileSystem.get(HadoopUtil.getCurrentConfiguration());
if (!fs.exists(statisticsFilePath))
- throw new IOException("File " + statisticsFilePath + " does not exists;");
+ throw new IOException("File " + statisticsFilePath + " does not exists");
FSDataInputStream is = fs.open(statisticsFilePath);
try {
@@ -105,7 +100,7 @@ public class SaveStatisticsStep extends AbstractExecutable {
break;
}
}
-
+
if (memoryHungry == true) {
alg = AlgorithmEnum.LAYER;
} else if ("random".equalsIgnoreCase(algPref)) { // for testing
@@ -120,40 +115,8 @@ public class SaveStatisticsStep extends AbstractExecutable {
}
logger.info("The cube algorithm for " + seg + " is " + alg);
- CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId());
+ CubingJob cubingJob = (CubingJob) executableManager.getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
cubingJob.setAlgorithm(alg);
}
- public void setCubeName(String cubeName) {
- this.setParam(CUBE_NAME, cubeName);
- }
-
- private String getCubeName() {
- return getParam(CUBE_NAME);
- }
-
- public void setSegmentId(String segmentId) {
- this.setParam(SEGMENT_ID, segmentId);
- }
-
- private String getSegmentId() {
- return getParam(SEGMENT_ID);
- }
-
- public void setStatisticsPath(String path) {
- this.setParam(STATISTICS_PATH, path);
- }
-
- private String getStatisticsPath() {
- return getParam(STATISTICS_PATH);
- }
-
- public void setCubingJobId(String id) {
- setParam(CUBING_JOB_ID, id);
- }
-
- private String getCubingJobId() {
- return getParam(CUBING_JOB_ID);
- }
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index a10fef4..c41aaf1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -33,50 +33,22 @@ import org.apache.kylin.job.execution.ExecuteResult;
*/
public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
- private static final String SEGMENT_ID = "segmentId";
- private static final String CUBE_NAME = "cubeName";
- private static final String CUBING_JOB_ID = "cubingJobId";
-
public UpdateCubeInfoAfterBuildStep() {
super();
}
- public void setCubeName(String cubeName) {
- this.setParam(CUBE_NAME, cubeName);
- }
-
- private String getCubeName() {
- return getParam(CUBE_NAME);
- }
-
- public void setSegmentId(String segmentId) {
- this.setParam(SEGMENT_ID, segmentId);
- }
-
- private String getSegmentId() {
- return getParam(SEGMENT_ID);
- }
-
- public void setCubingJobId(String id) {
- setParam(CUBING_JOB_ID, id);
- }
-
- private String getCubingJobId() {
- return getParam(CUBING_JOB_ID);
- }
-
@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
- final CubeInstance cube = cubeManager.getCube(getCubeName());
- final CubeSegment segment = cube.getSegmentById(getSegmentId());
+ final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+ final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
- CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId());
+ CubingJob cubingJob = (CubingJob) executableManager.getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
long sourceCount = cubingJob.findSourceRecordCount();
long sourceSizeBytes = cubingJob.findSourceSizeBytes();
long cubeSizeBytes = cubingJob.findCubeSizeBytes();
- segment.setLastBuildJobID(getCubingJobId());
+ segment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams()));
segment.setLastBuildTime(System.currentTimeMillis());
segment.setSizeKB(cubeSizeBytes / 1024);
segment.setInputRecords(sourceCount);
@@ -90,5 +62,5 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
index 527572b..d3ed68a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
@@ -40,10 +40,6 @@ import com.google.common.collect.Lists;
*/
public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
- private static final String CUBE_NAME = "cubeName";
- private static final String SEGMENT_ID = "segmentId";
- private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
- private static final String CUBING_JOB_ID = "cubingJobId";
private final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
@@ -53,18 +49,18 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- final CubeInstance cube = cubeManager.getCube(getCubeName());
+ final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
- CubeSegment mergedSegment = cube.getSegmentById(getSegmentId());
+ CubeSegment mergedSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
if (mergedSegment == null) {
- return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + getSegmentId());
+ return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + CubingExecutableUtil.getSegmentId(this.getParams()));
}
- CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId());
+ CubingJob cubingJob = (CubingJob) executableManager.getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
long cubeSizeBytes = cubingJob.findCubeSizeBytes();
// collect source statistics
- List<String> mergingSegmentIds = getMergingSegmentIds();
+ List<String> mergingSegmentIds = CubingExecutableUtil.getMergingSegmentIds(this.getParams());
if (mergingSegmentIds.isEmpty()) {
return new ExecuteResult(ExecuteResult.State.FAILED, "there are no merging segments");
}
@@ -80,7 +76,7 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
mergedSegment.setSizeKB(cubeSizeBytes / 1024);
mergedSegment.setInputRecords(sourceCount);
mergedSegment.setInputRecordsSize(sourceSize);
- mergedSegment.setLastBuildJobID(getCubingJobId());
+ mergedSegment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams()));
mergedSegment.setLastBuildTime(System.currentTimeMillis());
try {
@@ -92,45 +88,4 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
}
}
- public void setSegmentId(String segmentId) {
- this.setParam(SEGMENT_ID, segmentId);
- }
-
- private String getSegmentId() {
- return getParam(SEGMENT_ID);
- }
-
- public void setCubeName(String cubeName) {
- this.setParam(CUBE_NAME, cubeName);
- }
-
- private String getCubeName() {
- return getParam(CUBE_NAME);
- }
-
- public void setMergingSegmentIds(List<String> ids) {
- setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ","));
- }
-
- private List<String> getMergingSegmentIds() {
- final String ids = getParam(MERGING_SEGMENT_IDS);
- if (ids != null) {
- final String[] splitted = StringUtils.split(ids, ",");
- ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
- for (String id : splitted) {
- result.add(id);
- }
- return result;
- } else {
- return Collections.emptyList();
- }
- }
-
- public void setCubingJobId(String id) {
- setParam(CUBING_JOB_ID, id);
- }
-
- private String getCubingJobId() {
- return getParam(CUBING_JOB_ID);
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 285729f..981dac3 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ClassUtil;
@@ -103,7 +104,7 @@ import java.util.concurrent.LinkedBlockingQueue;
public class SparkCubing extends AbstractApplication {
private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable");
- private static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("cube").hasArg().isRequired(true).withDescription("Cube Name").create("cubeName");
+ private static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
private static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId");
private static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath");
private static final Option OPTION_COPROCESSOR = OptionBuilder.withArgName("coprocessor").hasArg().isRequired(true).withDescription("Coprocessor Jar Path").create("coprocessor");
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
index 9f17d60..05246f4 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
@@ -18,12 +18,12 @@
package org.apache.kylin.engine.spark;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.IMROutput2;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.slf4j.Logger;
@@ -48,7 +48,7 @@ public class SparkCubingJobBuilder extends JobBuilderSupport {
}
public DefaultChainedExecutable build() {
- final CubingJob result = CubingJob.createBuildJob((CubeSegment)seg, submitter, config);
+ final CubingJob result = CubingJob.createBuildJob((CubeSegment) seg, submitter, config);
final String jobId = result.getId();
inputSide.addStepPhase1_CreateFlatTable(result);
@@ -59,7 +59,7 @@ public class SparkCubingJobBuilder extends JobBuilderSupport {
final SparkExecutable sparkExecutable = new SparkExecutable();
sparkExecutable.setClassName(SparkCubing.class.getName());
sparkExecutable.setParam("hiveTable", tableName);
- sparkExecutable.setParam("cubeName", seg.getRealization().getName());
+ sparkExecutable.setParam(CubingExecutableUtil.CUBE_NAME, seg.getRealization().getName());
sparkExecutable.setParam("segmentId", seg.getUuid());
sparkExecutable.setParam("confPath", confPath);
sparkExecutable.setParam("coprocessor", coprocessor);
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
index 1c3e71a..5fea710 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -28,6 +28,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.engine.streaming.StreamingManager;
import org.apache.kylin.invertedindex.IIDescManager;
import org.apache.kylin.invertedindex.IIManager;
@@ -100,40 +101,7 @@ public abstract class BasicService {
}
protected List<CubingJob> listAllCubingJobs(final String cubeName, final String projectName, final Set<ExecutableState> statusList, final Map<String, Output> allOutputs) {
- List<CubingJob> results = Lists.newArrayList(FluentIterable.from(getExecutableManager().getAllExecutables()).filter(new Predicate<AbstractExecutable>() {
- @Override
- public boolean apply(AbstractExecutable executable) {
- if (executable instanceof CubingJob) {
- if (cubeName == null) {
- return true;
- }
- return ((CubingJob) executable).getCubeName().equalsIgnoreCase(cubeName);
- } else {
- return false;
- }
- }
- }).transform(new Function<AbstractExecutable, CubingJob>() {
- @Override
- public CubingJob apply(AbstractExecutable executable) {
- return (CubingJob) executable;
- }
- }).filter(new Predicate<CubingJob>() {
- @Override
- public boolean apply(CubingJob executable) {
- if (null == projectName || null == getProjectManager().getProject(projectName)) {
- return true;
- } else {
- ProjectInstance project = getProjectManager().getProject(projectName);
- return project.containsRealization(RealizationType.CUBE, executable.getCubeName());
- }
- }
- }).filter(new Predicate<CubingJob>() {
- @Override
- public boolean apply(CubingJob executable) {
- return statusList.contains(allOutputs.get(executable.getId()).getState());
- }
- }));
- return results;
+ return listAllCubingJobs(cubeName, projectName, statusList, -1L, -1L, allOutputs);
}
protected List<CubingJob> listAllCubingJobs(final String cubeName, final String projectName, final Set<ExecutableState> statusList, long timeStartInMillis, long timeEndInMillis, final Map<String, Output> allOutputs) {
@@ -144,7 +112,7 @@ public abstract class BasicService {
if (cubeName == null) {
return true;
}
- return ((CubingJob) executable).getCubeName().equalsIgnoreCase(cubeName);
+ return CubingExecutableUtil.getCubeName(executable.getParams()).equalsIgnoreCase(cubeName);
} else {
return false;
}
@@ -161,7 +129,7 @@ public abstract class BasicService {
return true;
} else {
ProjectInstance project = getProjectManager().getProject(projectName);
- return project.containsRealization(RealizationType.CUBE, executable.getCubeName());
+ return project.containsRealization(RealizationType.CUBE, CubingExecutableUtil.getCubeName(executable.getParams()));
}
}
}).filter(new Predicate<CubingJob>() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/server/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/JobService.java b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
index 155593a..cd8eef9 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -19,7 +19,13 @@
package org.apache.kylin.rest.service;
import java.io.IOException;
-import java.util.*;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.cube.CubeInstance;
@@ -30,6 +36,7 @@ import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.common.ShellExecutable;
import org.apache.kylin.job.constant.JobStatusEnum;
@@ -84,7 +91,7 @@ public class JobService extends BasicService {
}
public List<JobInstance> listAllJobs(final String cubeName, final String projectName, final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) {
- Calendar calendar= Calendar.getInstance();
+ Calendar calendar = Calendar.getInstance();
calendar.setTime(new Date());
long currentTimeMillis = calendar.getTimeInMillis();
long timeStartInMillis = getTimeStartInMillis(calendar, timeFilter);
@@ -153,22 +160,22 @@ public class JobService extends BasicService {
private long getTimeStartInMillis(Calendar calendar, JobTimeFilterEnum timeFilter) {
switch (timeFilter) {
- case LAST_ONE_DAY:
- calendar.add(Calendar.DAY_OF_MONTH, -1);
- return calendar.getTimeInMillis();
- case LAST_ONE_WEEK:
- calendar.add(Calendar.WEEK_OF_MONTH, -1);
- return calendar.getTimeInMillis();
- case LAST_ONE_MONTH:
- calendar.add(Calendar.MONTH, -1);
- return calendar.getTimeInMillis();
- case LAST_ONE_YEAR:
- calendar.add(Calendar.YEAR, -1);
- return calendar.getTimeInMillis();
- case ALL:
- return 0;
- default:
- throw new RuntimeException("illegal timeFilter for job history:" + timeFilter);
+ case LAST_ONE_DAY:
+ calendar.add(Calendar.DAY_OF_MONTH, -1);
+ return calendar.getTimeInMillis();
+ case LAST_ONE_WEEK:
+ calendar.add(Calendar.WEEK_OF_MONTH, -1);
+ return calendar.getTimeInMillis();
+ case LAST_ONE_MONTH:
+ calendar.add(Calendar.MONTH, -1);
+ return calendar.getTimeInMillis();
+ case LAST_ONE_YEAR:
+ calendar.add(Calendar.YEAR, -1);
+ return calendar.getTimeInMillis();
+ case ALL:
+ return 0;
+ default:
+ throw new RuntimeException("illegal timeFilter for job history:" + timeFilter);
}
}
@@ -263,8 +270,8 @@ public class JobService extends BasicService {
CubingJob cubeJob = (CubingJob) job;
final JobInstance result = new JobInstance();
result.setName(job.getName());
- result.setRelatedCube(cubeJob.getCubeName());
- result.setRelatedSegment(cubeJob.getSegmentIds());
+ result.setRelatedCube(CubingExecutableUtil.getCubeName(cubeJob.getParams()));
+ result.setRelatedSegment(CubingExecutableUtil.getSegmentId(cubeJob.getParams()));
result.setLastModified(cubeJob.getLastModified());
result.setSubmitter(cubeJob.getSubmitter());
result.setUuid(cubeJob.getId());
@@ -288,8 +295,8 @@ public class JobService extends BasicService {
Output output = outputs.get(job.getId());
final JobInstance result = new JobInstance();
result.setName(job.getName());
- result.setRelatedCube(cubeJob.getCubeName());
- result.setRelatedSegment(cubeJob.getSegmentIds());
+ result.setRelatedCube(CubingExecutableUtil.getCubeName(cubeJob.getParams()));
+ result.setRelatedSegment(CubingExecutableUtil.getSegmentId(cubeJob.getParams()));
result.setLastModified(output.getLastModified());
result.setSubmitter(cubeJob.getSubmitter());
result.setUuid(cubeJob.getId());
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
index cbfd8c3..68b6ae4 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
@@ -60,7 +60,7 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab
bindCurrentConfiguration(conf);
KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
- String tableName = conf.get(BatchConstants.TABLE_NAME);
+ String tableName = conf.get(BatchConstants.CFG_TABLE_NAME);
tableDesc = MetadataManager.getInstance(config).getTableDesc(tableName);
tableInputFormat = MRUtil.getTableInputFormat(tableDesc);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
index 70286ab..9162208 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
@@ -36,7 +36,6 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.TableDesc;
/**
@@ -79,7 +78,7 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob {
setJobClasspath(job);
String table = getOptionValue(OPTION_TABLE);
- job.getConfiguration().set(BatchConstants.TABLE_NAME, table);
+ job.getConfiguration().set(BatchConstants.CFG_TABLE_NAME, table);
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
FileOutputFormat.setOutputPath(job, output);
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index c2e2e64..6d77240 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -154,30 +154,7 @@ public class HBaseResourceStore extends ResourceStore {
@Override
protected List<RawResource> getAllResources(String rangeStart, String rangeEnd) throws IOException {
- byte[] startRow = Bytes.toBytes(rangeStart);
- byte[] endRow = plusZero(Bytes.toBytes(rangeEnd));
-
- Scan scan = new Scan(startRow, endRow);
- scan.addColumn(B_FAMILY, B_COLUMN_TS);
- scan.addColumn(B_FAMILY, B_COLUMN);
- tuneScanParameters(scan);
-
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
- List<RawResource> result = Lists.newArrayList();
- try {
- ResultScanner scanner = table.getScanner(scan);
- for (Result r : scanner) {
- result.add(new RawResource(getInputStream(Bytes.toString(r.getRow()), r), getTimestamp(r)));
- }
- } catch (IOException e) {
- for (RawResource rawResource : result) {
- IOUtils.closeQuietly(rawResource.inputStream);
- }
- throw e;
- } finally {
- IOUtils.closeQuietly(table);
- }
- return result;
+ return getAllResources(rangeStart, rangeEnd, -1L, -1L);
}
@Override
@@ -188,7 +165,10 @@ public class HBaseResourceStore extends ResourceStore {
Scan scan = new Scan(startRow, endRow);
scan.addColumn(B_FAMILY, B_COLUMN_TS);
scan.addColumn(B_FAMILY, B_COLUMN);
- scan.setFilter(generateTimeFilterList(timeStartInMillis, timeEndInMillis));
+ FilterList filterList = generateTimeFilterList(timeStartInMillis, timeEndInMillis);
+ if (filterList != null) {
+ scan.setFilter(filterList);
+ }
tuneScanParameters(scan);
HTableInterface table = getConnection().getTable(getAllInOneTableName());
@@ -218,11 +198,15 @@ public class HBaseResourceStore extends ResourceStore {
private FilterList generateTimeFilterList(long timeStartInMillis, long timeEndInMillis) {
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
- SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.GREATER, Bytes.toBytes(timeStartInMillis));
- filterList.addFilter(timeStartFilter);
- SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.LESS_OR_EQUAL, Bytes.toBytes(timeEndInMillis));
- filterList.addFilter(timeEndFilter);
- return filterList;
+ if (timeStartInMillis != -1L) {
+ SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.GREATER, Bytes.toBytes(timeStartInMillis));
+ filterList.addFilter(timeStartFilter);
+ }
+ if (timeEndInMillis != -1L) {
+ SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.LESS_OR_EQUAL, Bytes.toBytes(timeEndInMillis));
+ filterList.addFilter(timeEndFilter);
+ }
+ return filterList.getFilters().size() == 0 ? null : filterList;
}
private InputStream getInputStream(String resPath, Result r) throws IOException {
@@ -325,7 +309,7 @@ public class HBaseResourceStore extends ResourceStore {
byte[] rowkey = Bytes.toBytes(path);
Get get = new Get(rowkey);
-
+
if (!fetchContent && !fetchTimestamp) {
get.setCheckExistenceOnly(true);
} else {
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index a828728..f71d0f8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -7,15 +7,16 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-import org.apache.kylin.storage.hbase.ii.IIBulkLoadJob;
-import org.apache.kylin.storage.hbase.ii.IICreateHFileJob;
-import org.apache.kylin.storage.hbase.ii.IICreateHTableJob;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.metadata.realization.IRealizationSegment;
import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.ii.IIBulkLoadJob;
+import org.apache.kylin.storage.hbase.ii.IICreateHFileJob;
+import org.apache.kylin.storage.hbase.ii.IICreateHTableJob;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -41,16 +42,16 @@ public class HBaseMRSteps extends JobBuilderSupport {
public MapReduceExecutable createRangeRowkeyDistributionStep(String cuboidRootPath, String jobId) {
String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/") + "*";
-
+
MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable();
rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION);
StringBuilder cmd = new StringBuilder();
appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
- appendExecCmdParameters(cmd, "input", inputPath);
- appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath(jobId));
- appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
- appendExecCmdParameters(cmd, "jobname", "Kylin_Region_Splits_Calculator_" + seg.getRealization().getName() + "_Step");
+ appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getRowkeyDistributionOutputPath(jobId));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Region_Splits_Calculator_" + seg.getRealization().getName() + "_Step");
rowkeyDistributionStep.setMapReduceParams(cmd.toString());
rowkeyDistributionStep.setMapReduceJobClass(RangeKeyDistributionJob.class);
@@ -69,10 +70,10 @@ public class HBaseMRSteps extends JobBuilderSupport {
HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
StringBuilder cmd = new StringBuilder();
- appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "partitions", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
- appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(withStats));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
+ appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_ENABLED, String.valueOf(withStats));
createHtableStep.setJobParams(cmd.toString());
createHtableStep.setJobClass(CreateHTableJob.class);
@@ -82,18 +83,18 @@ public class HBaseMRSteps extends JobBuilderSupport {
public MapReduceExecutable createConvertCuboidToHfileStep(String cuboidRootPath, String jobId) {
String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/") + "*";
-
+
MapReduceExecutable createHFilesStep = new MapReduceExecutable();
createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
StringBuilder cmd = new StringBuilder();
appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
- appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
- appendExecCmdParameters(cmd, "partitions", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");
- appendExecCmdParameters(cmd, "input", inputPath);
- appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
- appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
- appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getRealization().getName() + "_Step");
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");
+ appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getHFilePath(jobId));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_HFile_Generator_" + seg.getRealization().getName() + "_Step");
createHFilesStep.setMapReduceParams(cmd.toString());
createHFilesStep.setMapReduceJobClass(CubeHFileJob.class);
@@ -107,9 +108,9 @@ public class HBaseMRSteps extends JobBuilderSupport {
bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
StringBuilder cmd = new StringBuilder();
- appendExecCmdParameters(cmd, "input", getHFilePath(jobId));
- appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
- appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getHFilePath(jobId));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
bulkLoadStep.setJobParams(cmd.toString());
bulkLoadStep.setJobClass(BulkLoadJob.class);
@@ -125,7 +126,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
}
public List<String> getMergingHTables() {
- final List<CubeSegment> mergingSegments = ((CubeInstance)seg.getRealization()).getMergingSegments((CubeSegment)seg);
+ final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment) seg);
Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
final List<String> mergingHTables = Lists.newArrayList();
for (CubeSegment merging : mergingSegments) {
@@ -135,7 +136,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
}
public List<String> getMergingHDFSPaths() {
- final List<CubeSegment> mergingSegments = ((CubeInstance)seg.getRealization()).getMergingSegments((CubeSegment)seg);
+ final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment) seg);
Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
final List<String> mergingHDFSPaths = Lists.newArrayList();
for (CubeSegment merging : mergingSegments) {
@@ -187,15 +188,14 @@ public class HBaseMRSteps extends JobBuilderSupport {
jobFlow.addTask(createCreateIIHTableStep(seg));
final String iiPath = rootPath + "*";
-
+
// generate hfiles step
jobFlow.addTask(createConvertIIToHfileStep(seg, iiPath, jobFlow.getId()));
// bulk load step
jobFlow.addTask(createIIBulkLoadStep(seg, jobFlow.getId()));
-
- }
+ }
public void addInvertedIndexGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
String jobId = jobFlow.getId();
@@ -211,14 +211,12 @@ public class HBaseMRSteps extends JobBuilderSupport {
jobFlow.addTask(step);
}
-
-
private HadoopShellExecutable createCreateIIHTableStep(IRealizationSegment seg) {
HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
StringBuilder cmd = new StringBuilder();
- appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
- appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_II_NAME, seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier());
createHtableStep.setJobParams(cmd.toString());
createHtableStep.setJobClass(IICreateHTableJob.class);
@@ -232,11 +230,11 @@ public class HBaseMRSteps extends JobBuilderSupport {
StringBuilder cmd = new StringBuilder();
appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
- appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
- appendExecCmdParameters(cmd, "input", inputPath);
- appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
- appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
- appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getRealization().getName() + "_Step");
+ appendExecCmdParameters(cmd, BatchConstants.ARG_II_NAME, seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getHFilePath(jobId));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_HFile_Generator_" + seg.getRealization().getName() + "_Step");
createHFilesStep.setMapReduceParams(cmd.toString());
createHFilesStep.setMapReduceJobClass(IICreateHFileJob.class);
@@ -249,9 +247,9 @@ public class HBaseMRSteps extends JobBuilderSupport {
bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
StringBuilder cmd = new StringBuilder();
- appendExecCmdParameters(cmd, "input", getHFilePath(jobId));
- appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
- appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getHFilePath(jobId));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_II_NAME, seg.getRealization().getName());
bulkLoadStep.setJobParams(cmd.toString());
bulkLoadStep.setJobClass(IIBulkLoadJob.class);
@@ -259,5 +257,5 @@ public class HBaseMRSteps extends JobBuilderSupport {
return bulkLoadStep;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
index 4cc4794..fa62a62 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
@@ -76,7 +76,7 @@ public class HBaseStreamingOutput implements IStreamingOutput {
CuboidStatsUtil.writeCuboidStatistics(conf, outputPath, samplingResult, 100);
FSDataInputStream inputStream = null;
try {
- inputStream = FileSystem.getLocal(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION));
+ inputStream = FileSystem.getLocal(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME));
ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), inputStream, System.currentTimeMillis());
} finally {
IOUtils.closeQuietly(inputStream);
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java
index 2ff7356..8b5daa3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java
@@ -99,11 +99,11 @@ public class RangeKeyDistributionJob extends AbstractHadoopJob {
int regionSplitSize = KylinConfig.getInstanceFromEnv().getHBaseRegionCut(cubeCapacity.toString());
int maxRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMax();
int minRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMin();
- job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
- job.getConfiguration().set(BatchConstants.HFILE_SIZE_GB, String.valueOf(hfileSizeGB));
- job.getConfiguration().set(BatchConstants.REGION_SPLIT_SIZE, String.valueOf(regionSplitSize));
- job.getConfiguration().set(BatchConstants.REGION_NUMBER_MAX, String.valueOf(maxRegionCount));
- job.getConfiguration().set(BatchConstants.REGION_NUMBER_MIN, String.valueOf(minRegionCount));
+ job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
+ job.getConfiguration().set(BatchConstants.CFG_HFILE_SIZE_GB, String.valueOf(hfileSizeGB));
+ job.getConfiguration().set(BatchConstants.CFG_REGION_SPLIT_SIZE, String.valueOf(regionSplitSize));
+ job.getConfiguration().set(BatchConstants.CFG_REGION_NUMBER_MAX, String.valueOf(maxRegionCount));
+ job.getConfiguration().set(BatchConstants.CFG_REGION_NUMBER_MIN, String.valueOf(minRegionCount));
// The partition file for hfile is sequenece file consists of ImmutableBytesWritable and NullWritable
TableMapReduceUtil.addDependencyJars(job.getConfiguration(), ImmutableBytesWritable.class, NullWritable.class);
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
index acdab62..4e53ca4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
@@ -57,24 +57,24 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable
protected void setup(Context context) throws IOException {
super.bindCurrentConfiguration(context.getConfiguration());
- if (context.getConfiguration().get(BatchConstants.OUTPUT_PATH) != null) {
- output = context.getConfiguration().get(BatchConstants.OUTPUT_PATH);
+ if (context.getConfiguration().get(BatchConstants.CFG_OUTPUT_PATH) != null) {
+ output = context.getConfiguration().get(BatchConstants.CFG_OUTPUT_PATH);
}
- if (context.getConfiguration().get(BatchConstants.HFILE_SIZE_GB) != null) {
- hfileSizeGB = Integer.valueOf(context.getConfiguration().get(BatchConstants.HFILE_SIZE_GB));
+ if (context.getConfiguration().get(BatchConstants.CFG_HFILE_SIZE_GB) != null) {
+ hfileSizeGB = Integer.valueOf(context.getConfiguration().get(BatchConstants.CFG_HFILE_SIZE_GB));
}
- if (context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE) != null) {
- cut = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE));
+ if (context.getConfiguration().get(BatchConstants.CFG_REGION_SPLIT_SIZE) != null) {
+ cut = Integer.valueOf(context.getConfiguration().get(BatchConstants.CFG_REGION_SPLIT_SIZE));
}
- if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN) != null) {
- minRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN));
+ if (context.getConfiguration().get(BatchConstants.CFG_REGION_NUMBER_MIN) != null) {
+ minRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.CFG_REGION_NUMBER_MIN));
}
- if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX) != null) {
- maxRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX));
+ if (context.getConfiguration().get(BatchConstants.CFG_REGION_NUMBER_MAX) != null) {
+ maxRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.CFG_REGION_NUMBER_MAX));
}
logger.info("Chosen cut for htable is " + cut + ", max region count=" + maxRegionCount
[2/2] kylin git commit: clean code
Posted by ma...@apache.org.
clean code
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5e13bba0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5e13bba0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5e13bba0
Branch: refs/heads/master
Commit: 5e13bba0822cafeb02817e5e59c08a3a4b5020c9
Parents: 4662ada
Author: Hongbin Ma <ma...@apache.org>
Authored: Fri Mar 11 14:03:45 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Fri Mar 11 15:00:35 2016 +0800
----------------------------------------------------------------------
.../kylin/common/persistence/ResourceStore.java | 26 ++-----
.../java/org/apache/kylin/cube/CubeSegment.java | 31 +++++---
.../org/apache/kylin/cube/cuboid/Cuboid.java | 26 +++----
.../kylin/job/constant/ExecutableConstants.java | 20 -----
.../kylin/job/execution/AbstractExecutable.java | 27 +++----
.../kylin/engine/mr/BatchCubingJobBuilder.java | 25 ++++---
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 48 ++++++------
.../kylin/engine/mr/BatchMergeJobBuilder.java | 13 ++--
.../kylin/engine/mr/BatchMergeJobBuilder2.java | 22 +++---
.../org/apache/kylin/engine/mr/CubingJob.java | 56 ++++----------
.../kylin/engine/mr/JobBuilderSupport.java | 63 +++++++++-------
.../engine/mr/common/AbstractHadoopJob.java | 33 ++++-----
.../kylin/engine/mr/common/BatchConstants.java | 59 ++++++++++-----
.../kylin/engine/mr/common/CubeStatsReader.java | 56 +++++++-------
.../kylin/engine/mr/common/CuboidStatsUtil.java | 2 +-
.../mr/invertedindex/BatchIIJobBuilder.java | 21 +++---
.../mr/invertedindex/InvertedIndexJob.java | 7 +-
.../engine/mr/steps/BaseCuboidMapperBase.java | 2 +-
.../engine/mr/steps/CubingExecutableUtil.java | 78 ++++++++++++++++++++
.../kylin/engine/mr/steps/CuboidReducer.java | 2 +-
.../engine/mr/steps/FactDistinctColumnsJob.java | 2 +-
.../mr/steps/FactDistinctColumnsMapperBase.java | 2 +-
.../mr/steps/FactDistinctColumnsReducer.java | 4 +-
.../engine/mr/steps/HiveToBaseCuboidMapper.java | 2 +-
.../engine/mr/steps/InMemCuboidMapper.java | 2 +-
.../engine/mr/steps/InMemCuboidReducer.java | 2 +-
.../engine/mr/steps/MergeDictionaryStep.java | 47 +-----------
.../engine/mr/steps/MergeStatisticsStep.java | 66 ++---------------
.../kylin/engine/mr/steps/NDCuboidMapper.java | 4 +-
.../engine/mr/steps/SaveStatisticsStep.java | 49 ++----------
.../mr/steps/UpdateCubeInfoAfterBuildStep.java | 38 ++--------
.../mr/steps/UpdateCubeInfoAfterMergeStep.java | 57 ++------------
.../apache/kylin/engine/spark/SparkCubing.java | 3 +-
.../engine/spark/SparkCubingJobBuilder.java | 6 +-
.../apache/kylin/rest/service/BasicService.java | 40 +---------
.../apache/kylin/rest/service/JobService.java | 51 +++++++------
.../cardinality/ColumnCardinalityMapper.java | 2 +-
.../cardinality/HiveColumnCardinalityJob.java | 3 +-
.../kylin/storage/hbase/HBaseResourceStore.java | 46 ++++--------
.../kylin/storage/hbase/steps/HBaseMRSteps.java | 78 ++++++++++----------
.../hbase/steps/HBaseStreamingOutput.java | 2 +-
.../hbase/steps/RangeKeyDistributionJob.java | 10 +--
.../steps/RangeKeyDistributionReducer.java | 20 ++---
43 files changed, 483 insertions(+), 670 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index ccae80b..88ee553 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -73,7 +73,7 @@ abstract public class ResourceStore {
}
return knownImpl;
}
-
+
private static ResourceStore createResourceStore(KylinConfig kylinConfig) {
List<Throwable> es = new ArrayList<Throwable>();
logger.info("Using metadata url " + kylinConfig.getMetadataUrl() + " for resource store");
@@ -141,7 +141,7 @@ abstract public class ResourceStore {
RawResource res = getResourceImpl(resPath);
if (res == null)
return null;
-
+
DataInputStream din = new DataInputStream(res.inputStream);
try {
T r = serializer.deserialize(din);
@@ -160,25 +160,9 @@ abstract public class ResourceStore {
final public long getResourceTimestamp(String resPath) throws IOException {
return getResourceTimestampImpl(norm(resPath));
}
-
+
final public <T extends RootPersistentEntity> List<T> getAllResources(String rangeStart, String rangeEnd, Class<T> clazz, Serializer<T> serializer) throws IOException {
- final List<RawResource> allResources = getAllResources(rangeStart, rangeEnd);
- if (allResources.isEmpty()) {
- return Collections.emptyList();
- }
- List<T> result = Lists.newArrayList();
- try {
- for (RawResource rawResource : allResources) {
- final T element = serializer.deserialize(new DataInputStream(rawResource.inputStream));
- element.setLastModified(rawResource.timestamp);
- result.add(element);
- }
- return result;
- } finally {
- for (RawResource rawResource : allResources) {
- IOUtils.closeQuietly(rawResource.inputStream);
- }
- }
+ return getAllResources(rangeStart, rangeEnd, -1L, -1L, clazz, serializer);
}
final public <T extends RootPersistentEntity> List<T> getAllResources(String rangeStart, String rangeEnd, long timeStartInMillis, long timeEndInMillis, Class<T> clazz, Serializer<T> serializer) throws IOException {
@@ -210,7 +194,7 @@ abstract public class ResourceStore {
/** returns 0 if not exists */
abstract protected long getResourceTimestampImpl(String resPath) throws IOException;
-
+
/**
* overwrite a resource without write conflict check
*/
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 67dce73..5b61c10 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -18,12 +18,12 @@
package org.apache.kylin.cube;
-import java.text.SimpleDateFormat;
-import java.util.Collection;
-import java.util.Map;
-import java.util.TimeZone;
-import java.util.concurrent.ConcurrentHashMap;
-
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonBackReference;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.ShardingHash;
@@ -37,11 +37,12 @@ import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.IRealizationSegment;
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-import com.fasterxml.jackson.annotation.JsonBackReference;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Maps;
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, IRealizationSegment {
@@ -76,6 +77,9 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
private Map<Long, Short> cuboidShardNums = Maps.newHashMap();
@JsonProperty("total_shards")
private int totalShards = 0;
+ @JsonProperty("blackout_cuboids")
+ private List<Long> blackoutCuboids = Lists.newArrayList();
+
@JsonProperty("binary_signature")
private String binarySignature; // a hash of cube schema and dictionary ID, used for sanity check
@@ -391,6 +395,7 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
/**
* get the number of shards where each cuboid will distribute
+ *
* @return
*/
public Short getCuboidShardNum(Long cuboidId) {
@@ -423,6 +428,10 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
return ret;
}
+ public List<Long> getBlackoutCuboids() {
+ return this.blackoutCuboids;
+ }
+
@Override
public IRealization getRealization() {
return cubeInstance;
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
index 16b0287..89f5204 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
@@ -18,10 +18,14 @@
package org.apache.kylin.cube.cuboid;
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
@@ -31,13 +35,10 @@ import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.RowKeyColDesc;
import org.apache.kylin.metadata.model.TblColRef;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.Lists;
public class Cuboid implements Comparable<Cuboid> {
@@ -231,8 +232,7 @@ public class Cuboid implements Comparable<Cuboid> {
return true;
}
- hier:
- for (HierarchyMask hierarchyMasks : hierarchyMaskList) {
+ hier: for (HierarchyMask hierarchyMasks : hierarchyMaskList) {
long result = cuboidID & hierarchyMasks.fullMask;
if (result > 0) {
for (long mask : hierarchyMasks.allMasks) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index d370b0d..d3c1003 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -26,21 +26,15 @@ public final class ExecutableConstants {
}
public static final String YARN_APP_ID = "yarn_application_id";
-
public static final String YARN_APP_URL = "yarn_application_tracking_url";
public static final String MR_JOB_ID = "mr_job_id";
public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
public static final String SOURCE_RECORDS_COUNT = "source_records_count";
public static final String SOURCE_RECORDS_SIZE = "source_records_size";
- public static final String GLOBAL_LISTENER_NAME = "ChainListener";
public static final int DEFAULT_SCHEDULER_INTERVAL_SECONDS = 60;
- public static final String CUBE_JOB_GROUP_NAME = "cube_job_group";
-
- public static final String DAEMON_JOB_GROUP_NAME = "daemon_job_group";
public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary";
-
public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid Data";
@@ -57,23 +51,9 @@ public final class ExecutableConstants {
public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage Collection";
public static final String STEP_NAME_GARBAGE_COLLECTION_HDFS = "Garbage Collection on HDFS";
-
public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile";
public static final String STEP_NAME_UPDATE_II_INFO = "Update Inverted Index Info";
- public static final String PROP_ENGINE_CONTEXT = "jobengineConfig";
- public static final String PROP_JOB_FLOW = "jobFlow";
- public static final String PROP_JOBINSTANCE_UUID = "jobInstanceUuid";
- public static final String PROP_JOBSTEP_SEQ_ID = "jobStepSequenceID";
- public static final String PROP_COMMAND = "command";
- // public static final String PROP_STORAGE_LOCATION =
- // "storageLocationIdentifier";
- public static final String PROP_JOB_ASYNC = "jobAsync";
- public static final String PROP_JOB_CMD_EXECUTOR = "jobCmdExecutor";
- public static final String PROP_JOB_CMD_OUTPUT = "jobCmdOutput";
- public static final String PROP_JOB_KILLED = "jobKilled";
- public static final String PROP_JOB_RUNTIME_FLOWS = "jobFlows";
-
public static final String NOTIFY_EMAIL_TEMPLATE = "<div><b>Build Result of Job ${job_name}</b><pre><ul>" + "<li>Build Result: <b>${result}</b></li>" + "<li>Job Engine: ${job_engine}</li>" + "<li>Cube Name: ${cube_name}</li>" + "<li>Source Records Count: ${source_records_count}</li>" + "<li>Start Time: ${start_time}</li>" + "<li>Duration: ${duration}</li>" + "<li>MR Waiting: ${mr_waiting}</li>" + "<li>Last Update Time: ${last_update_time}</li>" + "<li>Submitter: ${submitter}</li>" + "<li>Error Log: ${error_log}</li>" + "</ul></pre><div/>";
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 8d5fea5..83c61ae 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -18,10 +18,13 @@
package org.apache.kylin.job.execution;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kylin.common.KylinConfig;
@@ -32,12 +35,10 @@ import org.apache.kylin.job.manager.ExecutableManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
/**
*/
@@ -117,12 +118,12 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
}
retry++;
} while (((result != null && result.succeed() == false) || exception != null) && needRetry() == true);
-
+
if (exception != null) {
onExecuteError(exception, executableContext);
throw new ExecuteException(exception);
}
-
+
onExecuteFinished(result, executableContext);
return result;
}
@@ -164,7 +165,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
@Override
public final Map<String, String> getParams() {
- return Collections.unmodifiableMap(this.params);
+ return this.params;
}
public final String getParam(String key) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index 3c10c09..45d03d1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -22,6 +22,7 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.RowKeyDesc;
import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.engine.mr.steps.BaseCuboidJob;
import org.apache.kylin.engine.mr.steps.NDCuboidJob;
@@ -92,12 +93,12 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);
- appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", "FLAT_TABLE"); // marks flat table input
- appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
- appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
- appendExecCmdParameters(cmd, "level", "0");
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, "FLAT_TABLE"); // marks flat table input
+ appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[0]);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "0");
baseCuboidStep.setMapReduceParams(cmd.toString());
baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
@@ -113,12 +114,12 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
StringBuilder cmd = new StringBuilder();
appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel());
- appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
- appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
- appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step");
- appendExecCmdParameters(cmd, "level", "" + (totalRowkeyColumnCount - dimNum));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step");
+ appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "" + (totalRowkeyColumnCount - dimNum));
ndCuboidStep.setMapReduceParams(cmd.toString());
ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class);
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 86439a8..5f4a3ed 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -22,8 +22,10 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.RowKeyDesc;
import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.engine.mr.steps.BaseCuboidJob;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
import org.apache.kylin.engine.mr.steps.NDCuboidJob;
import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
@@ -89,10 +91,10 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
SaveStatisticsStep result = new SaveStatisticsStep();
result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
- result.setCubeName(seg.getRealization().getName());
- result.setSegmentId(seg.getUuid());
- result.setStatisticsPath(getStatisticsPath(jobId));
- result.setCubingJobId(jobId);
+ CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+ CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+ CubingExecutableUtil.setStatisticsPath(getStatisticsPath(jobId), result.getParams());
+ CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
return result;
}
@@ -105,11 +107,11 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
- appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "output", cuboidRootPath);
- appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getRealization().getName());
- appendExecCmdParameters(cmd, "cubingJobId", jobId);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidRootPath);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Cube_Builder_" + seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
cubeStep.setMapReduceParams(cmd.toString());
cubeStep.setMapReduceJobClass(InMemCuboidJob.class);
@@ -126,13 +128,13 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);
- appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", "FLAT_TABLE"); // marks flat table input
- appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
- appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
- appendExecCmdParameters(cmd, "level", "0");
- appendExecCmdParameters(cmd, "cubingJobId", jobId);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, "FLAT_TABLE"); // marks flat table input
+ appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[0]);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "0");
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
baseCuboidStep.setMapReduceParams(cmd.toString());
baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
@@ -148,13 +150,13 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
StringBuilder cmd = new StringBuilder();
appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel());
- appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
- appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
- appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step");
- appendExecCmdParameters(cmd, "level", "" + (totalRowkeyColumnCount - dimNum));
- appendExecCmdParameters(cmd, "cubingJobId", jobId);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step");
+ appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "" + (totalRowkeyColumnCount - dimNum));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
ndCuboidStep.setMapReduceParams(cmd.toString());
ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class);
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
index 831aa9d..6f1d445 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
import org.apache.kylin.job.constant.ExecutableConstants;
@@ -48,7 +49,7 @@ public class BatchMergeJobBuilder extends JobBuilderSupport {
public CubingJob build() {
logger.info("MR_V1 new job to MERGE segment " + seg);
- final CubeSegment cubeSegment = (CubeSegment)seg;
+ final CubeSegment cubeSegment = (CubeSegment) seg;
final CubingJob result = CubingJob.createMergeJob(cubeSegment, submitter, config);
final String jobId = result.getId();
@@ -84,11 +85,11 @@ public class BatchMergeJobBuilder extends JobBuilderSupport {
StringBuilder cmd = new StringBuilder();
appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", inputPath);
- appendExecCmdParameters(cmd, "output", outputPath);
- appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
mergeCuboidDataStep.setMapReduceParams(cmd.toString());
mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class);
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index 008d489..e151674 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -22,7 +22,9 @@ import java.util.List;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
import org.apache.kylin.engine.mr.steps.MergeStatisticsStep;
import org.apache.kylin.job.constant.ExecutableConstants;
@@ -79,10 +81,12 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
MergeStatisticsStep result = new MergeStatisticsStep();
result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
- result.setCubeName(seg.getCubeInstance().getName());
- result.setSegmentId(seg.getUuid());
- result.setMergingSegmentIds(mergingSegmentIds);
- result.setMergedStatisticsPath(mergedStatisticsFolder);
+
+ CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+ CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+ CubingExecutableUtil.setMergingSegmentIds(mergingSegmentIds, result.getParams());
+ CubingExecutableUtil.setMergedStatisticsPath(mergedStatisticsFolder, result.getParams());
+
return result;
}
@@ -92,11 +96,11 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
StringBuilder cmd = new StringBuilder();
appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", inputPath);
- appendExecCmdParameters(cmd, "output", outputPath);
- appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
mergeCuboidDataStep.setMapReduceParams(cmd.toString());
mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class);
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index 979ff75..0325a09 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -24,17 +24,16 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
-import java.util.List;
import java.util.TimeZone;
import java.util.regex.Matcher;
-import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
@@ -47,8 +46,8 @@ import org.apache.kylin.job.execution.Output;
/**
*/
public class CubingJob extends DefaultChainedExecutable {
-
- public static enum AlgorithmEnum {
+
+ public enum AlgorithmEnum {
LAYER, INMEM
}
@@ -58,9 +57,6 @@ public class CubingJob extends DefaultChainedExecutable {
public static final String CUBE_SIZE_BYTES = "byteSizeBytes";
public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
- private static final String CUBE_INSTANCE_NAME = "cubeName";
- private static final String SEGMENT_ID = "segmentId";
-
public static CubingJob createBuildJob(CubeSegment seg, String submitter, JobEngineConfig config) {
return initCubingJob(seg, "BUILD", submitter, config);
}
@@ -73,8 +69,8 @@ public class CubingJob extends DefaultChainedExecutable {
CubingJob result = new CubingJob();
SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
format.setTimeZone(TimeZone.getTimeZone(config.getTimeZone()));
- result.setCubeName(seg.getCubeInstance().getName());
- result.setSegmentId(seg.getUuid());
+ CubingExecutableUtil.setCubeName(seg.getCubeInstance().getName(), result.getParams());
+ CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
result.setName(seg.getCubeInstance().getName() + " - " + seg.getName() + " - " + jobType + " - " + format.format(new Date(System.currentTimeMillis())));
result.setSubmitter(submitter);
result.setNotifyList(seg.getCubeInstance().getDescriptor().getNotifyList());
@@ -85,29 +81,9 @@ public class CubingJob extends DefaultChainedExecutable {
super();
}
- void setCubeName(String name) {
- setParam(CUBE_INSTANCE_NAME, name);
- }
-
- public String getCubeName() {
- return getParam(CUBE_INSTANCE_NAME);
- }
-
- void setSegmentIds(List<String> segmentIds) {
- setParam(SEGMENT_ID, StringUtils.join(segmentIds, ","));
- }
-
- void setSegmentId(String segmentId) {
- setParam(SEGMENT_ID, segmentId);
- }
-
- public String getSegmentIds() {
- return getParam(SEGMENT_ID);
- }
-
@Override
protected Pair<String, String> formatNotifications(ExecutableContext context, ExecutableState state) {
- CubeInstance cubeInstance = CubeManager.getInstance(context.getConfig()).getCube(getCubeName());
+ CubeInstance cubeInstance = CubeManager.getInstance(context.getConfig()).getCube(CubingExecutableUtil.getCubeName(this.getParams()));
final Output output = jobService.getOutput(getId());
String logMsg;
state = output.getState();
@@ -131,7 +107,7 @@ public class CubingJob extends DefaultChainedExecutable {
String content = ExecutableConstants.NOTIFY_EMAIL_TEMPLATE;
content = content.replaceAll("\\$\\{job_name\\}", getName());
content = content.replaceAll("\\$\\{result\\}", state.toString());
- content = content.replaceAll("\\$\\{cube_name\\}", getCubeName());
+ content = content.replaceAll("\\$\\{cube_name\\}", CubingExecutableUtil.getCubeName(this.getParams()));
content = content.replaceAll("\\$\\{source_records_count\\}", String.valueOf(findSourceRecordCount()));
content = content.replaceAll("\\$\\{start_time\\}", new Date(getStartTime()).toString());
content = content.replaceAll("\\$\\{duration\\}", getDuration() / 60000 + "mins");
@@ -147,7 +123,7 @@ public class CubingJob extends DefaultChainedExecutable {
logger.warn(e.getLocalizedMessage(), e);
}
- String title = "[" + state.toString() + "] - [Kylin Cube Build Job]-" + getCubeName();
+ String title = "[" + state.toString() + "] - [Kylin Cube Build Job]-" + CubingExecutableUtil.getCubeName(this.getParams());
return Pair.of(title, content);
}
@@ -174,11 +150,11 @@ public class CubingJob extends DefaultChainedExecutable {
public void setMapReduceWaitTime(long t) {
addExtraInfo(MAP_REDUCE_WAIT_TIME, t + "");
}
-
+
public void setAlgorithm(AlgorithmEnum alg) {
addExtraInfo("algorithm", alg.name());
}
-
+
public AlgorithmEnum getAlgorithm() {
String alg = getExtraInfo().get("algorithm");
return alg == null ? null : AlgorithmEnum.valueOf(alg);
@@ -187,11 +163,11 @@ public class CubingJob extends DefaultChainedExecutable {
public boolean isLayerCubing() {
return AlgorithmEnum.LAYER == getAlgorithm();
}
-
+
public boolean isInMemCubing() {
return AlgorithmEnum.INMEM == getAlgorithm();
}
-
+
public long findSourceRecordCount() {
return Long.parseLong(findExtraInfo(SOURCE_RECORD_COUNT, "0"));
}
@@ -204,7 +180,7 @@ public class CubingJob extends DefaultChainedExecutable {
// look for the info BACKWARD, let the last step that claims the cube size win
return Long.parseLong(findExtraInfoBackward(CUBE_SIZE_BYTES, "0"));
}
-
+
public String findExtraInfo(String key, String dft) {
return findExtraInfo(key, dft, false);
}
@@ -212,14 +188,14 @@ public class CubingJob extends DefaultChainedExecutable {
public String findExtraInfoBackward(String key, String dft) {
return findExtraInfo(key, dft, true);
}
-
+
private String findExtraInfo(String key, String dft, boolean backward) {
ArrayList<AbstractExecutable> tasks = new ArrayList<AbstractExecutable>(getTasks());
-
+
if (backward) {
Collections.reverse(tasks);
}
-
+
for (AbstractExecutable child : tasks) {
Output output = executableManager.getOutput(child.getId());
String value = output.getExtra().get(key);
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index e3b07d8..a3fef8e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -23,21 +23,23 @@ import java.util.List;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.engine.mr.invertedindex.UpdateIIInfoAfterBuildStep;
import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob;
import org.apache.kylin.engine.mr.steps.MergeDictionaryStep;
import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep;
import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
-
-import com.google.common.base.Preconditions;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.realization.IRealizationSegment;
+import com.google.common.base.Preconditions;
+
/**
* Hold reusable steps for builders.
*/
@@ -67,14 +69,14 @@ public class JobBuilderSupport {
result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
result.setMapReduceJobClass(FactDistinctColumnsJob.class);
StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
- appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
- appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(jobId));
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(withStats));
- appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
- appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
- appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step");
+ appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getFactDistinctColumnsPath(jobId));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_ENABLED, String.valueOf(withStats));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_OUTPUT, getStatisticsPath(jobId));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_SAMPLING_PERCENT, String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step");
result.setMapReduceParams(cmd.toString());
return result;
@@ -85,9 +87,9 @@ public class JobBuilderSupport {
HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
StringBuilder cmd = new StringBuilder();
- appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", getFactDistinctColumnsPath(jobId));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getFactDistinctColumnsPath(jobId));
buildDictionaryStep.setJobParams(cmd.toString());
buildDictionaryStep.setJobClass(CreateDictionaryJob.class);
@@ -95,34 +97,38 @@ public class JobBuilderSupport {
}
public UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId) {
- final UpdateCubeInfoAfterBuildStep updateCubeInfoStep = new UpdateCubeInfoAfterBuildStep();
- updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
- updateCubeInfoStep.setCubeName(seg.getRealization().getName());
- updateCubeInfoStep.setSegmentId(seg.getUuid());
- updateCubeInfoStep.setCubingJobId(jobId);
- return updateCubeInfoStep;
+ final UpdateCubeInfoAfterBuildStep result = new UpdateCubeInfoAfterBuildStep();
+ result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
+
+ CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+ CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+ CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
+
+ return result;
}
public MergeDictionaryStep createMergeDictionaryStep(List<String> mergingSegmentIds) {
MergeDictionaryStep result = new MergeDictionaryStep();
result.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
- result.setCubeName(seg.getRealization().getName());
- result.setSegmentId(seg.getUuid());
- result.setMergingSegmentIds(mergingSegmentIds);
+
+ CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+ CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+ CubingExecutableUtil.setMergingSegmentIds(mergingSegmentIds, result.getParams());
+
return result;
}
public UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(List<String> mergingSegmentIds, String jobId) {
UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep();
result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
- result.setCubeName(seg.getRealization().getName());
- result.setSegmentId(seg.getUuid());
- result.setMergingSegmentIds(mergingSegmentIds);
- result.setCubingJobId(jobId);
- return result;
- }
+ CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+ CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+ CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
+ CubingExecutableUtil.setMergingSegmentIds(mergingSegmentIds, result.getParams());
+ return result;
+ }
public UpdateIIInfoAfterBuildStep createUpdateIIInfoAfterBuildStep(String jobId) {
final UpdateIIInfoAfterBuildStep updateIIInfoStep = new UpdateIIInfoAfterBuildStep();
@@ -141,6 +147,7 @@ public class JobBuilderSupport {
public String getRealizationRootPath(String jobId) {
return getJobWorkingDir(jobId) + "/" + seg.getRealization().getName();
}
+
public String getCuboidRootPath(String jobId) {
return getRealizationRootPath(jobId) + "/cuboid/";
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 61983d5..fe60ca8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -23,7 +23,7 @@ package org.apache.kylin.engine.mr.common;
*
*/
-import static org.apache.hadoop.util.StringUtils.*;
+import static org.apache.hadoop.util.StringUtils.formatTime;
import java.io.File;
import java.io.IOException;
@@ -74,22 +74,21 @@ import org.slf4j.LoggerFactory;
public abstract class AbstractHadoopJob extends Configured implements Tool {
protected static final Logger logger = LoggerFactory.getLogger(AbstractHadoopJob.class);
- protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Job name. For exmaple, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create("jobname");
- protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create("cubename");
- protected static final Option OPTION_CUBING_JOB_ID = OptionBuilder.withArgName("cubingJobId").hasArg().isRequired(false).withDescription("ID of cubing job executable").create("cubingJobId");
- protected static final Option OPTION_II_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("II name. For exmaple, some_ii").create("iiname");
- protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube segment name)").create("segmentname");
- protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Hive table name.").create("tablename");
- protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input");
- protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName("inputformat").hasArg().isRequired(false).withDescription("Input format").create("inputformat");
- protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Output path").create("output");
- protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName("level").hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create("level");
- protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("partitions");
- protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName("htable name").hasArg().isRequired(true).withDescription("HTable name").create("htablename");
-
- protected static final Option OPTION_STATISTICS_ENABLED = OptionBuilder.withArgName("statisticsenabled").hasArg().isRequired(false).withDescription("Statistics enabled").create("statisticsenabled");
- protected static final Option OPTION_STATISTICS_OUTPUT = OptionBuilder.withArgName("statisticsoutput").hasArg().isRequired(false).withDescription("Statistics output").create("statisticsoutput");
- protected static final Option OPTION_STATISTICS_SAMPLING_PERCENT = OptionBuilder.withArgName("statisticssamplingpercent").hasArg().isRequired(false).withDescription("Statistics sampling percentage").create("statisticssamplingpercent");
+ protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName(BatchConstants.ARG_JOB_NAME).hasArg().isRequired(true).withDescription("Job name. For example, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create(BatchConstants.ARG_JOB_NAME);
+ protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create(BatchConstants.ARG_CUBE_NAME);
+ protected static final Option OPTION_CUBING_JOB_ID = OptionBuilder.withArgName(BatchConstants.ARG_CUBING_JOB_ID).hasArg().isRequired(false).withDescription("ID of cubing job executable").create(BatchConstants.ARG_CUBING_JOB_ID);
+ protected static final Option OPTION_II_NAME = OptionBuilder.withArgName(BatchConstants.ARG_II_NAME).hasArg().isRequired(true).withDescription("II name. For exmaple, some_ii").create(BatchConstants.ARG_II_NAME);
+ protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName( BatchConstants.ARG_SEGMENT_NAME).hasArg().isRequired(true).withDescription("Cube segment name").create( BatchConstants.ARG_SEGMENT_NAME);
+ protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg().isRequired(true).withDescription("Input path").create(BatchConstants.ARG_INPUT);
+ protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName(BatchConstants.ARG_INPUT_FORMAT).hasArg().isRequired(false).withDescription("Input format").create(BatchConstants.ARG_INPUT_FORMAT);
+ protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Output path").create(BatchConstants.ARG_OUTPUT);
+ protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName(BatchConstants.ARG_LEVEL).hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create(BatchConstants.ARG_LEVEL);
+ protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName(BatchConstants.ARG_PARTITION).hasArg().isRequired(true).withDescription("Partition file path.").create(BatchConstants.ARG_PARTITION);
+ protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_HTABLE_NAME).hasArg().isRequired(true).withDescription("HTable name").create(BatchConstants.ARG_HTABLE_NAME);
+
+ protected static final Option OPTION_STATISTICS_ENABLED = OptionBuilder.withArgName(BatchConstants.ARG_STATS_ENABLED).hasArg().isRequired(false).withDescription("Statistics enabled").create(BatchConstants.ARG_STATS_ENABLED);
+ protected static final Option OPTION_STATISTICS_OUTPUT = OptionBuilder.withArgName(BatchConstants.ARG_STATS_OUTPUT).hasArg().isRequired(false).withDescription("Statistics output").create(BatchConstants.ARG_STATS_OUTPUT);
+ protected static final Option OPTION_STATISTICS_SAMPLING_PERCENT = OptionBuilder.withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(false).withDescription("Statistics sampling percentage").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT);
protected String name;
protected boolean isAsync = false;
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 6943f18..a614f4b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -20,8 +20,15 @@ package org.apache.kylin.engine.mr.common;
public interface BatchConstants {
+ /**
+ * source data config
+ */
char INTERMEDIATE_TABLE_ROW_DELIMITER = 127;
+ /**
+ * ConFiGuration entry names for MR jobs
+ */
+
String CFG_CUBE_NAME = "cube.name";
String CFG_CUBE_SEGMENT_NAME = "cube.segment.name";
String CFG_CUBE_CUBOID_LEVEL = "cube.cuboid.level";
@@ -29,31 +36,47 @@ public interface BatchConstants {
String CFG_II_NAME = "ii.name";
String CFG_II_SEGMENT_NAME = "ii.segment.name";
- String OUTPUT_PATH = "output.path";
-
- String TABLE_NAME = "table.name";
- String TABLE_COLUMNS = "table.columns";
-
+ String CFG_OUTPUT_PATH = "output.path";
+ String CFG_TABLE_NAME = "table.name";
String CFG_IS_MERGE = "is.merge";
String CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER = "cube.intermediate.table.row.delimiter";
+ String CFG_REGION_NUMBER_MIN = "region.number.min";
+ String CFG_REGION_NUMBER_MAX = "region.number.max";
+ String CFG_REGION_SPLIT_SIZE = "region.split.size";
+ String CFG_HFILE_SIZE_GB = "hfile.size.gb";
- String MAPREDUCE_COUNTER_GROUP_NAME = "Cube Builder";
-
- String REGION_NUMBER_MIN = "region.number.min";
- String REGION_NUMBER_MAX = "region.number.max";
- String REGION_SPLIT_SIZE = "region.split.size";
- String HFILE_SIZE_GB = "hfile.size.gb";
-
String CFG_KYLIN_LOCAL_TEMP_DIR = "/tmp/kylin/";
String CFG_KYLIN_HDFS_TEMP_DIR = "/tmp/kylin/";
-
+
String CFG_STATISTICS_LOCAL_DIR = CFG_KYLIN_LOCAL_TEMP_DIR + "cuboidstatistics/";
String CFG_STATISTICS_ENABLED = "statistics.enabled";
- String CFG_STATISTICS_OUTPUT = "statistics.ouput";
+ String CFG_STATISTICS_OUTPUT = "statistics.ouput";//spell error, for compatibility issue better not change it
String CFG_STATISTICS_SAMPLING_PERCENT = "statistics.sampling.percent";
- String CFG_STATISTICS_CUBE_ESTIMATION = "cube_statistics.txt";
- String CFG_STATISTICS_CUBOID_ESTIMATION = "cuboid_statistics.seq";
+ String CFG_STATISTICS_CUBE_ESTIMATION_FILENAME = "cube_statistics.txt";
+ String CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME = "cuboid_statistics.seq";
- int COUNTER_MAX = 100000;
- int ERROR_RECORD_THRESHOLD = 100;
+ /**
+ * command line ARGuments
+ */
+ String ARG_INPUT = "input";
+ String ARG_OUTPUT = "output";
+ String ARG_JOB_NAME = "jobname";
+ String ARG_CUBING_JOB_ID = "cubingJobId";
+ String ARG_CUBE_NAME = "cubename";
+ String ARG_II_NAME = "iiname";
+ String ARG_SEGMENT_NAME = "segmentname";
+ String ARG_PARTITION = "partitions";
+ String ARG_STATS_ENABLED= "statisticsenabled";
+ String ARG_STATS_OUTPUT= "statisticsoutput";
+ String ARG_STATS_SAMPLING_PERCENT= "statisticssamplingpercent";
+ String ARG_HTABLE_NAME= "htablename";
+ String ARG_INPUT_FORMAT= "inputformat";
+ String ARG_LEVEL= "level";
+
+ /**
+ * logger and counter
+ */
+ String MAPREDUCE_COUNTER_GROUP_NAME = "Cube Builder";
+ int NORMAL_RECORD_LOG_THRESHOLD = 100000;
+ int ERROR_RECORD_LOG_THRESHOLD = 100;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index 57e93c3..00b9e32 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -18,19 +18,8 @@
package org.apache.kylin.engine.mr.common;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PrintWriter;
-import java.text.DecimalFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
@@ -61,9 +50,16 @@ import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
/**
* This should be in cube module. It's here in engine-mr because currently stats
@@ -76,7 +72,7 @@ public class CubeStatsReader {
final CubeSegment seg;
final int samplingPercentage;
final double mapperOverlapRatioOfFirstBuild; // only makes sense for the first build, is meaningless after merge
- final Map<Long, HyperLogLogPlusCounter> cuboidRowCountMap;
+ final Map<Long, HyperLogLogPlusCounter> cuboidRowEstimatesHLL;
public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException {
ResourceStore store = ResourceStore.getStore(kylinConfig);
@@ -112,7 +108,7 @@ public class CubeStatsReader {
this.seg = cubeSegment;
this.samplingPercentage = percentage;
this.mapperOverlapRatioOfFirstBuild = mapperOverlapRatio;
- this.cuboidRowCountMap = counterMap;
+ this.cuboidRowEstimatesHLL = counterMap;
} finally {
IOUtils.closeStream(reader);
@@ -133,13 +129,13 @@ public class CubeStatsReader {
return tempFile;
}
- public Map<Long, Long> getCuboidRowCountMap() {
- return getCuboidRowCountMapFromSampling(cuboidRowCountMap, samplingPercentage);
+ public Map<Long, Long> getCuboidRowEstimatesHLL() {
+ return getCuboidRowCountMapFromSampling(cuboidRowEstimatesHLL, samplingPercentage);
}
// return map of Cuboid ID => MB
public Map<Long, Double> getCuboidSizeMap() {
- return getCuboidSizeMapFromRowCount(seg, getCuboidRowCountMap());
+ return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL());
}
public double getMapperOverlapRatioOfFirstBuild() {
@@ -147,15 +143,13 @@ public class CubeStatsReader {
}
public static Map<Long, Long> getCuboidRowCountMapFromSampling(Map<Long, HyperLogLogPlusCounter> hllcMap, int samplingPercentage) {
- return Maps.transformValues(hllcMap, new Function<HyperLogLogPlusCounter, Long>() {
- @Nullable
- @Override
- public Long apply(HyperLogLogPlusCounter input) {
- // No need to adjust according sampling percentage. Assumption is that data set is far
- // more than cardinality. Even a percentage of the data should already see all cardinalities.
- return input.getCountEstimate();
- }
- });
+ Map<Long, Long> cuboidRowCountMap = Maps.newHashMap();
+ for (Map.Entry<Long, HyperLogLogPlusCounter> entry : hllcMap.entrySet()) {
+ // No need to adjust according sampling percentage. Assumption is that data set is far
+ // more than cardinality. Even a percentage of the data should already see all cardinalities.
+ cuboidRowCountMap.put(entry.getKey(), entry.getValue().getCountEstimate());
+ }
+ return cuboidRowCountMap;
}
public static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap) {
@@ -219,7 +213,7 @@ public class CubeStatsReader {
}
private void print(PrintWriter out) {
- Map<Long, Long> cuboidRows = getCuboidRowCountMap();
+ Map<Long, Long> cuboidRows = getCuboidRowEstimatesHLL();
Map<Long, Double> cuboidSizes = getCuboidSizeMap();
List<Long> cuboids = new ArrayList<Long>(cuboidRows.keySet());
Collections.sort(cuboids);
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
index 02fe0f0..cb4b1cb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
@@ -43,7 +43,7 @@ public class CuboidStatsUtil {
}
public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int samplingPercentage, double mapperOverlapRatio) throws IOException {
- Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
+ Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
List<Long> allCuboids = new ArrayList<Long>();
allCuboids.addAll(cuboidHLLMap.keySet());
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
index e7501b8..4841e64 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
@@ -22,6 +22,7 @@ import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
import org.apache.kylin.engine.mr.IMROutput;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.job.constant.ExecutableConstants;
@@ -29,9 +30,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BatchIIJobBuilder extends JobBuilderSupport {
-
+
private static final Logger logger = LoggerFactory.getLogger(BatchIIJobBuilder.class);
-
+
private final IMRBatchCubingInputSide inputSide;
private final IMROutput.IMRBatchInvertedIndexingOutputSide outputSide;
@@ -44,15 +45,15 @@ public class BatchIIJobBuilder extends JobBuilderSupport {
public IIJob build() {
logger.info("MR new job to BUILD segment " + seg);
- final IIJob result = IIJob.createBuildJob((IISegment)seg, submitter, config);
+ final IIJob result = IIJob.createBuildJob((IISegment) seg, submitter, config);
final String jobId = result.getId();
-
+
final String iiRootPath = getRealizationRootPath(jobId) + "/";
// Phase 1: Create Flat Table
inputSide.addStepPhase1_CreateFlatTable(result);
-
+
// Phase 2: Build Inverted Index
- result.addTask(createInvertedIndexStep((IISegment)seg, iiRootPath));
+ result.addTask(createInvertedIndexStep((IISegment) seg, iiRootPath));
outputSide.addStepPhase3_BuildII(result, iiRootPath);
// Phase 3: Update Metadata & Cleanup
@@ -71,13 +72,13 @@ public class BatchIIJobBuilder extends JobBuilderSupport {
buildIIStep.setName(ExecutableConstants.STEP_NAME_BUILD_II);
- appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
- appendExecCmdParameters(cmd, "output", iiOutputTempPath);
- appendExecCmdParameters(cmd, "jobname", ExecutableConstants.STEP_NAME_BUILD_II);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_II_NAME, seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, iiOutputTempPath);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, ExecutableConstants.STEP_NAME_BUILD_II);
buildIIStep.setMapReduceParams(cmd.toString());
buildIIStep.setMapReduceJobClass(InvertedIndexJob.class);
return buildIIStep;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
index 27505e6..9ea2411 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
@@ -19,14 +19,12 @@
package org.apache.kylin.engine.mr.invertedindex;
import java.io.IOException;
-import java.util.ArrayList;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
@@ -39,8 +37,6 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,7 +102,6 @@ public class InvertedIndexJob extends AbstractHadoopJob {
conf.set(BatchConstants.CFG_II_SEGMENT_NAME, seg.getName());
}
-
private void setupMapper(IISegment segment) throws IOException {
IMRInput.IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat();
@@ -127,7 +122,7 @@ public class InvertedIndexJob extends AbstractHadoopJob {
FileOutputFormat.setOutputPath(job, output);
- job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
+ job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
deletePath(job.getConfiguration(), output);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index 3dddece..5fb9098 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -195,7 +195,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
// TODO expose errorRecordCounter as hadoop counter
errorRecordCounter++;
- if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) {
+ if (errorRecordCounter > BatchConstants.ERROR_RECORD_LOG_THRESHOLD) {
if (ex instanceof IOException)
throw (IOException) ex;
else if (ex instanceof RuntimeException)
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
new file mode 100644
index 0000000..71f27f2
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
@@ -0,0 +1,78 @@
+package org.apache.kylin.engine.mr.steps;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.google.common.collect.Lists;
+
+public class CubingExecutableUtil {
+
+ public static final String CUBE_NAME = "cubeName";
+ public static final String SEGMENT_ID = "segmentId";
+ public static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
+ public static final String STATISTICS_PATH = "statisticsPath";
+ public static final String CUBING_JOB_ID = "cubingJobId";
+ public static final String MERGED_STATISTICS_PATH = "mergedStatisticsPath";
+
+ public static void setStatisticsPath(String path, Map<String, String> params) {
+ params.put(STATISTICS_PATH, path);
+ }
+
+ public static String getStatisticsPath(Map<String, String> params) {
+ return params.get(STATISTICS_PATH);
+ }
+
+ public static void setCubeName(String cubeName, Map<String, String> params) {
+ params.put(CUBE_NAME, cubeName);
+ }
+
+ public static String getCubeName(Map<String, String> params) {
+ return params.get(CUBE_NAME);
+ }
+
+ public static void setSegmentId(String segmentId, Map<String, String> params) {
+ params.put(SEGMENT_ID, segmentId);
+ }
+
+ public static String getSegmentId(Map<String, String> params) {
+ return params.get(SEGMENT_ID);
+ }
+
+ public static void setMergingSegmentIds(List<String> ids, Map<String, String> params) {
+ params.put(MERGING_SEGMENT_IDS, StringUtils.join(ids, ","));
+ }
+
+ public static List<String> getMergingSegmentIds(Map<String, String> params) {
+ final String ids = params.get(MERGING_SEGMENT_IDS);
+ if (ids != null) {
+ final String[] splitted = StringUtils.split(ids, ",");
+ ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
+ for (String id : splitted) {
+ result.add(id);
+ }
+ return result;
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ public static void setCubingJobId(String id, Map<String, String> params) {
+ params.put(CUBING_JOB_ID, id);
+ }
+
+ public static String getCubingJobId(Map<String, String> params) {
+ return params.get(CUBING_JOB_ID);
+ }
+
+ public static void setMergedStatisticsPath(String path, Map<String, String> params) {
+ params.put(MERGED_STATISTICS_PATH, path);
+ }
+
+ public static String getMergedStatisticsPath(Map<String, String> params) {
+ return params.get(MERGED_STATISTICS_PATH);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
index 4dbb53e..f263d99 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
@@ -108,7 +108,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> {
context.write(key, outputValue);
counter++;
- if (counter % BatchConstants.COUNTER_MAX == 0) {
+ if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
logger.info("Handled " + counter + " records!");
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index 1373e2c..2dabb7a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -125,7 +125,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
job.setNumReduceTasks(numberOfReducers);
FileOutputFormat.setOutputPath(job, output);
- job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
+ job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
deletePath(job.getConfiguration(), output);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index 1412dfb..cc7b6df 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -71,7 +71,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
ex.printStackTrace(System.err);
errorRecordCounter++;
- if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) {
+ if (errorRecordCounter > BatchConstants.ERROR_RECORD_LOG_THRESHOLD) {
if (ex instanceof IOException)
throw (IOException) ex;
else if (ex instanceof RuntimeException)
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index f3e0290..f43834d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -128,7 +128,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
private void outputDistinctValues(TblColRef col, Collection<ByteArray> values, Context context) throws IOException {
final Configuration conf = context.getConfiguration();
final FileSystem fs = FileSystem.get(conf);
- final String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
+ final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH);
final Path outputFile = new Path(outputPath, col.getName());
FSDataOutputStream out = null;
@@ -176,7 +176,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
private void writeMapperAndCuboidStatistics(Context context) throws IOException {
Configuration conf = context.getConfiguration();
FileSystem fs = FileSystem.get(conf);
- FSDataOutputStream out = fs.create(new Path(statisticsOutput, BatchConstants.CFG_STATISTICS_CUBE_ESTIMATION));
+ FSDataOutputStream out = fs.create(new Path(statisticsOutput, BatchConstants.CFG_STATISTICS_CUBE_ESTIMATION_FILENAME));
try {
String msg;
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
index e2a49df..8f5557d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
@@ -41,7 +41,7 @@ public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, O
@Override
public void map(KEYIN key, Object value, Context context) throws IOException, InterruptedException {
counter++;
- if (counter % BatchConstants.COUNTER_MAX == 0) {
+ if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
logger.info("Handled " + counter + " records!");
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index b094b98..e42a6b5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -100,7 +100,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
while (!future.isDone()) {
if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) {
counter++;
- if (counter % BatchConstants.COUNTER_MAX == 0) {
+ if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
logger.info("Handled " + counter + " records!");
}
break;
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
index 9beacbb..e72c38b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -85,7 +85,7 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra
context.write(outputKey, outputValue);
counter++;
- if (counter % BatchConstants.COUNTER_MAX == 0) {
+ if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
logger.info("Handled " + counter + " records!");
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
index f247a8f..264ba9b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
@@ -25,7 +25,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -44,10 +43,6 @@ import com.google.common.collect.Lists;
public class MergeDictionaryStep extends AbstractExecutable {
- private static final String CUBE_NAME = "cubeName";
- private static final String SEGMENT_ID = "segmentId";
- private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
-
public MergeDictionaryStep() {
super();
}
@@ -56,8 +51,8 @@ public class MergeDictionaryStep extends AbstractExecutable {
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
KylinConfig conf = context.getConfig();
final CubeManager mgr = CubeManager.getInstance(conf);
- final CubeInstance cube = mgr.getCube(getCubeName());
- final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
+ final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+ final CubeSegment newSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
final List<CubeSegment> mergingSegments = getMergingSegments(cube);
Collections.sort(mergingSegments);
@@ -79,7 +74,7 @@ public class MergeDictionaryStep extends AbstractExecutable {
}
private List<CubeSegment> getMergingSegments(CubeInstance cube) {
- List<String> mergingSegmentIds = getMergingSegmentIds();
+ List<String> mergingSegmentIds = CubingExecutableUtil.getMergingSegmentIds(this.getParams());
List<CubeSegment> result = Lists.newArrayListWithCapacity(mergingSegmentIds.size());
for (String id : mergingSegmentIds) {
result.add(cube.getSegmentById(id));
@@ -111,7 +106,7 @@ public class MergeDictionaryStep extends AbstractExecutable {
CubeDesc cubeDesc = cube.getDescriptor();
for (TblColRef col : cubeDesc.getAllColumnsNeedDictionary()) {
- String dictTable = dictMgr.decideSourceData(cubeDesc.getModel(),true, col).getTable();
+ String dictTable = dictMgr.decideSourceData(cubeDesc.getModel(), true, col).getTable();
if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) {
colsNeedMeringDict.add(col);
} else {
@@ -165,38 +160,4 @@ public class MergeDictionaryStep extends AbstractExecutable {
}
}
- public void setCubeName(String cubeName) {
- this.setParam(CUBE_NAME, cubeName);
- }
-
- private String getCubeName() {
- return getParam(CUBE_NAME);
- }
-
- public void setSegmentId(String segmentId) {
- this.setParam(SEGMENT_ID, segmentId);
- }
-
- private String getSegmentId() {
- return getParam(SEGMENT_ID);
- }
-
- public void setMergingSegmentIds(List<String> ids) {
- setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ","));
- }
-
- private List<String> getMergingSegmentIds() {
- final String ids = getParam(MERGING_SEGMENT_IDS);
- if (ids != null) {
- final String[] splitted = StringUtils.split(ids, ",");
- ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
- for (String id : splitted) {
- result.add(id);
- }
- return result;
- } else {
- return Collections.emptyList();
- }
- }
-
}